summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/dependencies.cpp50
-rw-r--r--src/mongo/db/pipeline/dependencies.h28
-rw-r--r--src/mongo/db/pipeline/dependencies_test.cpp5
-rw-r--r--src/mongo/db/pipeline/document.cpp21
-rw-r--r--src/mongo/db/pipeline/document.h25
-rw-r--r--src/mongo/db/pipeline/document_internal.h16
-rw-r--r--src/mongo/db/pipeline/document_source.h2
-rw-r--r--src/mongo/db/pipeline/document_source_limit.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h43
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_match.h6
-rw-r--r--src/mongo/db/pipeline/document_source_match_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h11
-rw-r--r--src/mongo/db/pipeline/document_source_sample.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h6
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_skip.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h36
-rw-r--r--src/mongo/db/pipeline/document_source_sort.cpp177
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h39
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp22
-rw-r--r--src/mongo/db/pipeline/pipeline.h6
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp65
-rw-r--r--src/mongo/db/query/query_planner.cpp20
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp13
-rw-r--r--src/mongo/s/query/SConscript6
-rw-r--r--src/mongo/s/query/async_results_merger.cpp11
-rw-r--r--src/mongo/s/query/async_results_merger.h8
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp99
-rw-r--r--src/mongo/s/query/router_exec_stage.h27
-rw-r--r--src/mongo/s/query/router_stage_aggregation_merge.cpp79
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp12
-rw-r--r--src/mongo/s/query/router_stage_limit.h6
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp2
-rw-r--r--src/mongo/s/query/router_stage_merge.h3
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp2
-rw-r--r--src/mongo/s/query/router_stage_mock.h5
-rw-r--r--src/mongo/s/query/router_stage_pipeline.cpp135
-rw-r--r--src/mongo/s/query/router_stage_pipeline.h (renamed from src/mongo/s/query/router_stage_aggregation_merge.h)13
-rw-r--r--src/mongo/s/query/router_stage_remove_metadata_fields.cpp89
-rw-r--r--src/mongo/s/query/router_stage_remove_metadata_fields.h54
-rw-r--r--src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp (renamed from src/mongo/s/query/router_stage_remove_sortkey_test.cpp)93
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp75
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h53
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp12
-rw-r--r--src/mongo/s/query/router_stage_skip.h6
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp9
48 files changed, 932 insertions, 506 deletions
diff --git a/src/mongo/db/pipeline/dependencies.cpp b/src/mongo/db/pipeline/dependencies.cpp
index bb6b7175ae5..a1fc3b88a19 100644
--- a/src/mongo/db/pipeline/dependencies.cpp
+++ b/src/mongo/db/pipeline/dependencies.cpp
@@ -41,34 +41,40 @@ using std::vector;
namespace str = mongoutils::str;
-BSONObj DepsTracker::toProjection() const {
- if (fields.empty() && !needWholeDocument) {
- if (_needTextScore) {
- // We only need the text score, but there is no easy way to express this in the query
- // projection language. We use $noFieldsNeeded with a textScore meta-projection since
- // this is an inclusion projection which will exclude all existing fields but add the
- // textScore metadata.
- return BSON("_id" << 0 << "$noFieldsNeeded" << 1 << Document::metaFieldTextScore
- << BSON("$meta"
- << "textScore"));
- } else {
- // We truly need no information (we are doing a count or something similar). In this
- // case, the DocumentSourceCursor will know there aren't any dependencies, and we can
- // ignore the documents returned from the query system. We pass an empty object as the
- // projection so that we have a chance of using the COUNT_SCAN optimization.
- return BSONObj();
- }
+bool DepsTracker::_appendMetaProjections(BSONObjBuilder* projectionBuilder) const {
+ if (_needTextScore) {
+ projectionBuilder->append(Document::metaFieldTextScore,
+ BSON("$meta"
+ << "textScore"));
}
+ if (_needSortKey) {
+ projectionBuilder->append(Document::metaFieldSortKey,
+ BSON("$meta"
+ << "sortKey"));
+ }
+ return (_needTextScore || _needSortKey);
+}
+BSONObj DepsTracker::toProjection() const {
BSONObjBuilder bb;
- if (_needTextScore)
- bb.append(Document::metaFieldTextScore,
- BSON("$meta"
- << "textScore"));
+ const bool needsMetadata = _appendMetaProjections(&bb);
- if (needWholeDocument)
+ if (needWholeDocument) {
return bb.obj();
+ }
+
+ if (fields.empty()) {
+ if (needsMetadata) {
+ // We only need metadata, but there is no easy way to express this in the query
+ // projection language. We use $noFieldsNeeded with a meta-projection since this is an
+ // inclusion projection which will exclude all existing fields but add the metadata.
+ bb.append("_id", 0);
+ bb.append("$noFieldsNeeded", 1);
+ }
+ // We either need nothing (as we would if this was logically a count), or only the metadata.
+ return bb.obj();
+ }
bool needId = false;
string last;
diff --git a/src/mongo/db/pipeline/dependencies.h b/src/mongo/db/pipeline/dependencies.h
index 9afd9adbaec..19cf3ee33f8 100644
--- a/src/mongo/db/pipeline/dependencies.h
+++ b/src/mongo/db/pipeline/dependencies.h
@@ -47,7 +47,7 @@ struct DepsTracker {
enum MetadataAvailable { kNoMetadata = 0, kTextScore = 1 };
DepsTracker(MetadataAvailable metadataAvailable = kNoMetadata)
- : needWholeDocument(false), _metadataAvailable(metadataAvailable), _needTextScore(false) {}
+ : _metadataAvailable(metadataAvailable) {}
/**
* Returns a projection object covering the dependencies tracked by this class.
@@ -56,10 +56,6 @@ struct DepsTracker {
boost::optional<ParsedDeps> toParsedDeps() const;
- std::set<std::string> fields; // names of needed fields in dotted notation
- bool needWholeDocument; // if true, ignore fields and assume the whole document is needed
-
-
bool hasNoRequirements() const {
return fields.empty() && !needWholeDocument && !_needTextScore;
}
@@ -85,9 +81,29 @@ struct DepsTracker {
_needTextScore = needTextScore;
}
+ bool getNeedSortKey() const {
+ return _needSortKey;
+ }
+
+ void setNeedSortKey(bool needSortKey) {
+ // We don't expect to ever unset '_needSortKey'.
+ invariant(!_needSortKey || needSortKey);
+ _needSortKey = needSortKey;
+ }
+
+ std::set<std::string> fields; // The names of needed fields in dotted notation.
+ bool needWholeDocument = false; // If true, ignore 'fields' and assume the whole document is
+ // needed.
private:
+ /**
+ * Appends the meta projections for the sort key and/or text score to 'bb' if necessary. Returns
+ * true if either type of metadata was needed, and false otherwise.
+ */
+ bool _appendMetaProjections(BSONObjBuilder* bb) const;
+
MetadataAvailable _metadataAvailable;
- bool _needTextScore;
+ bool _needTextScore = false; // if true, add a {$meta: "textScore"} to the projection.
+ bool _needSortKey = false; // if true, add a {$meta: "sortKey"} to the projection.
};
/**
diff --git a/src/mongo/db/pipeline/dependencies_test.cpp b/src/mongo/db/pipeline/dependencies_test.cpp
index 5bfd008bbfe..cb33e1fcc9f 100644
--- a/src/mongo/db/pipeline/dependencies_test.cpp
+++ b/src/mongo/db/pipeline/dependencies_test.cpp
@@ -145,8 +145,9 @@ TEST(DependenciesToProjectionTest, ShouldAttemptToExcludeOtherFieldsIfOnlyTextSc
deps.needWholeDocument = false;
deps.setNeedTextScore(true);
ASSERT_BSONOBJ_EQ(deps.toProjection(),
- BSON("_id" << 0 << "$noFieldsNeeded" << 1 << Document::metaFieldTextScore
- << metaTextScore));
+ BSON(Document::metaFieldTextScore << metaTextScore << "_id" << 0
+ << "$noFieldsNeeded"
+ << 1));
}
TEST(DependenciesToProjectionTest,
diff --git a/src/mongo/db/pipeline/document.cpp b/src/mongo/db/pipeline/document.cpp
index 835ae1bc716..4f531fe72f3 100644
--- a/src/mongo/db/pipeline/document.cpp
+++ b/src/mongo/db/pipeline/document.cpp
@@ -45,6 +45,9 @@ using std::vector;
const DocumentStorage DocumentStorage::kEmptyDoc;
+const std::vector<StringData> Document::allMetadataFieldNames = {
+ Document::metaFieldTextScore, Document::metaFieldRandVal, Document::metaFieldSortKey};
+
Position DocumentStorage::findField(StringData requested) const {
int reqSize = requested.size(); // get size calculation out of the way if needed
@@ -201,6 +204,7 @@ intrusive_ptr<DocumentStorage> DocumentStorage::clone() const {
out->_metaFields = _metaFields;
out->_textScore = _textScore;
out->_randVal = _randVal;
+ out->_sortKey = _sortKey.getOwned();
// Tell values that they have been memcpyed (updates ref counts)
for (DocumentStorageIterator it = out->iteratorAll(); !it.atEnd(); it.advance()) {
@@ -265,8 +269,9 @@ BSONObj Document::toBson() const {
return bb.obj();
}
-const StringData Document::metaFieldTextScore("$textScore"_sd);
-const StringData Document::metaFieldRandVal("$randVal"_sd);
+constexpr StringData Document::metaFieldTextScore;
+constexpr StringData Document::metaFieldRandVal;
+constexpr StringData Document::metaFieldSortKey;
BSONObj Document::toBsonWithMetaData() const {
BSONObjBuilder bb;
@@ -275,6 +280,8 @@ BSONObj Document::toBsonWithMetaData() const {
bb.append(metaFieldTextScore, getTextScore());
if (hasRandMetaField())
bb.append(metaFieldRandVal, getRandMetaField());
+ if (hasSortKeyMetaField())
+ bb.append(metaFieldSortKey, getSortKeyMetaField());
return bb.obj();
}
@@ -292,6 +299,9 @@ Document Document::fromBsonWithMetaData(const BSONObj& bson) {
} else if (fieldName == metaFieldRandVal) {
md.setRandMetaField(elem.Double());
continue;
+ } else if (fieldName == metaFieldSortKey) {
+ md.setSortKeyMetaField(elem.Obj());
+ continue;
}
}
@@ -465,6 +475,10 @@ void Document::serializeForSorter(BufBuilder& buf) const {
buf.appendNum(char(DocumentStorage::MetaType::RAND_VAL + 1));
buf.appendNum(getRandMetaField());
}
+ if (hasSortKeyMetaField()) {
+ buf.appendNum(char(DocumentStorage::MetaType::SORT_KEY + 1));
+ getSortKeyMetaField().appendSelfToBufBuilder(buf);
+ }
buf.appendNum(char(0));
}
@@ -481,6 +495,9 @@ Document Document::deserializeForSorter(BufReader& buf, const SorterDeserializeS
doc.setTextScore(buf.read<LittleEndian<double>>());
} else if (marker == char(DocumentStorage::MetaType::RAND_VAL) + 1) {
doc.setRandMetaField(buf.read<LittleEndian<double>>());
+ } else if (marker == char(DocumentStorage::MetaType::SORT_KEY) + 1) {
+ doc.setSortKeyMetaField(
+ BSONObj::deserializeForSorter(buf, BSONObj::SorterDeserializeSettings()));
} else {
uasserted(28744, "Unrecognized marker, unable to deserialize buffer");
}
diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h
index 5e5980f5a51..a8ebfd1c4d2 100644
--- a/src/mongo/db/pipeline/document.h
+++ b/src/mongo/db/pipeline/document.h
@@ -90,6 +90,12 @@ public:
const Document& rhs;
};
+ static constexpr StringData metaFieldTextScore = "$textScore"_sd;
+ static constexpr StringData metaFieldRandVal = "$randVal"_sd;
+ static constexpr StringData metaFieldSortKey = "$sortKey"_sd;
+
+ static const std::vector<StringData> allMetadataFieldNames;
+
/// Empty Document (does no allocation)
Document() {}
@@ -211,6 +217,12 @@ public:
*/
static Document fromBsonWithMetaData(const BSONObj& bson);
+ /**
+ * Given a BSON object that may have metadata fields added as part of toBsonWithMetadata(),
+ * returns the same object without any of the metadata fields.
+ */
+ static BSONObj stripMetadataFields(const BSONObj& bsonWithMetadata);
+
// Support BSONObjBuilder and BSONArrayBuilder "stream" API
friend BSONObjBuilder& operator<<(BSONObjBuilderValueStream& builder, const Document& d);
@@ -233,7 +245,6 @@ public:
return Document(storage().clone().get());
}
- static const StringData metaFieldTextScore; // "$textScore"
bool hasTextScore() const {
return storage().hasTextScore();
}
@@ -241,7 +252,6 @@ public:
return storage().getTextScore();
}
- static const StringData metaFieldRandVal; // "$randVal"
bool hasRandMetaField() const {
return storage().hasRandMetaField();
}
@@ -249,6 +259,13 @@ public:
return storage().getRandMetaField();
}
+ bool hasSortKeyMetaField() const {
+ return storage().hasSortKeyMetaField();
+ }
+ BSONObj getSortKeyMetaField() const {
+ return storage().getSortKeyMetaField();
+ }
+
/// members for Sorter
struct SorterDeserializeSettings {}; // unused
void serializeForSorter(BufBuilder& buf) const;
@@ -493,6 +510,10 @@ public:
storage().setRandMetaField(val);
}
+ void setSortKeyMetaField(BSONObj sortKey) {
+ storage().setSortKeyMetaField(sortKey);
+ }
+
/** Convert to a read-only document and release reference.
*
* Call this to indicate that you are done with this Document and will
diff --git a/src/mongo/db/pipeline/document_internal.h b/src/mongo/db/pipeline/document_internal.h
index 2fd3c78cf34..baa68e60658 100644
--- a/src/mongo/db/pipeline/document_internal.h
+++ b/src/mongo/db/pipeline/document_internal.h
@@ -196,6 +196,7 @@ public:
enum MetaType : char {
TEXT_SCORE,
RAND_VAL,
+ SORT_KEY,
NUM_FIELDS
};
@@ -280,6 +281,9 @@ public:
if (source.hasRandMetaField()) {
setRandMetaField(source.getRandMetaField());
}
+ if (source.hasSortKeyMetaField()) {
+ setSortKeyMetaField(source.getSortKeyMetaField());
+ }
}
bool hasTextScore() const {
@@ -304,6 +308,17 @@ public:
_randVal = val;
}
+ bool hasSortKeyMetaField() const {
+ return _metaFields.test(MetaType::SORT_KEY);
+ }
+ BSONObj getSortKeyMetaField() const {
+ return _sortKey;
+ }
+ void setSortKeyMetaField(BSONObj sortKey) {
+ _metaFields.set(MetaType::SORT_KEY);
+ _sortKey = sortKey.getOwned();
+ }
+
private:
/// Same as lastElement->next() or firstElement() if empty.
const ValueElement* end() const {
@@ -385,6 +400,7 @@ private:
std::bitset<MetaType::NUM_FIELDS> _metaFields;
double _textScore;
double _randVal;
+ BSONObj _sortKey;
// When adding a field, make sure to update clone() method
// Defined in document.cpp
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 134a8a86e3d..b0793113863 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -449,7 +449,7 @@ public:
EXHAUSTIVE_FIELDS = 0x2,
// Later stages won't need more metadata from input. For example, a $group stage will group
- // documents together, discarding their text score.
+ // documents together, discarding their text score and sort keys.
EXHAUSTIVE_META = 0x4,
// Later stages won't need either fields or metadata.
diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp
index 704ba907165..8256d055805 100644
--- a/src/mongo/db/pipeline/document_source_limit.cpp
+++ b/src/mongo/db/pipeline/document_source_limit.cpp
@@ -49,9 +49,7 @@ REGISTER_DOCUMENT_SOURCE(limit,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceLimit::createFromBson);
-const char* DocumentSourceLimit::getSourceName() const {
- return "$limit";
-}
+constexpr StringData DocumentSourceLimit::kStageName;
Pipeline::SourceContainer::iterator DocumentSourceLimit::doOptimizeAt(
Pipeline::SourceContainer::iterator itr, Pipeline::SourceContainer* container) {
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index f5f93fff1a4..7bf47638b21 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -34,9 +34,25 @@ namespace mongo {
class DocumentSourceLimit final : public DocumentSource, public SplittableDocumentSource {
public:
- // virtuals from DocumentSource
+ static constexpr StringData kStageName = "$limit"_sd;
+
+ /**
+ * Create a new $limit stage.
+ */
+ static boost::intrusive_ptr<DocumentSourceLimit> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
+
+ /**
+ * Parse a $limit stage from a BSON stage specification. 'elem's field name must be "$limit".
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
GetNextResult getNext() final;
- const char* getSourceName() const final;
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
BSONObjSet getOutputSorts() final {
return pSource ? pSource->getOutputSorts()
: SimpleBSONObjComparator::kInstance.makeBSONObjSet();
@@ -60,15 +76,6 @@ public:
}
/**
- Create a new limiting DocumentSource.
-
- @param pExpCtx the expression context for the pipeline
- @returns the DocumentSource
- */
- static boost::intrusive_ptr<DocumentSourceLimit> create(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
-
- /**
* Returns the current DocumentSourceLimit for use in the shards pipeline. Running this stage on
* the shards is an optimization, but is not strictly necessary in order to produce correct
* pipeline output.
@@ -93,20 +100,6 @@ public:
_limit = newLimit;
}
- /**
- Create a limiting DocumentSource from BSON.
-
- This is a convenience method that uses the above, and operates on
- a BSONElement that has been deteremined to be an Object with an
- element named $limit.
-
- @param pBsonElement the BSONELement that defines the limit
- @param pExpCtx the expression context
- @returns the grouping DocumentSource
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
private:
DocumentSourceLimit(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long limit);
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index 5493d21ecd1..17f8cc34e83 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -471,10 +471,11 @@ BSONObj DocumentSourceMatch::getQuery() const {
DocumentSource::GetDepsReturn DocumentSourceMatch::getDependencies(DepsTracker* deps) const {
if (isTextQuery()) {
- // A $text aggregation field should return EXHAUSTIVE_ALL, since we don't necessarily know
- // what field it will be searching without examining indices.
+ // A $text aggregation field should return EXHAUSTIVE_FIELDS, since we don't necessarily
+ // know what field it will be searching without examining indices.
deps->needWholeDocument = true;
- return EXHAUSTIVE_ALL;
+ deps->setNeedTextScore(true);
+ return EXHAUSTIVE_FIELDS;
}
addDependencies(deps);
@@ -493,7 +494,11 @@ void DocumentSourceMatch::addDependencies(DepsTracker* deps) const {
DocumentSourceMatch::DocumentSourceMatch(const BSONObj& query,
const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), _predicate(query.getOwned()), _isTextQuery(isTextQuery(query)) {
+ : DocumentSource(pExpCtx),
+ _predicate(query.getOwned()),
+ _isTextQuery(isTextQuery(query)),
+ _dependencies(_isTextQuery ? DepsTracker::MetadataAvailable::kTextScore
+ : DepsTracker::MetadataAvailable::kNoMetadata) {
StatusWithMatchExpression status = uassertStatusOK(
MatchExpressionParser::parse(_predicate,
pExpCtx->getCollator(),
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 135dbc39e17..06219b22596 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -160,11 +160,11 @@ private:
std::unique_ptr<MatchExpression> _expression;
- // Cache the dependencies so that we know what fields we need to serialize to BSON for matching.
- DepsTracker _dependencies;
-
BSONObj _predicate;
const bool _isTextQuery;
+
+ // Cache the dependencies so that we know what fields we need to serialize to BSON for matching.
+ DepsTracker _dependencies;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_match_test.cpp b/src/mongo/db/pipeline/document_source_match_test.cpp
index c01ae31db9a..cbe9c522aeb 100644
--- a/src/mongo/db/pipeline/document_source_match_test.cpp
+++ b/src/mongo/db/pipeline/document_source_match_test.cpp
@@ -218,10 +218,10 @@ TEST_F(DocumentSourceMatchTest, ShouldAddDependenciesOfAllBranchesOfOrClause) {
TEST_F(DocumentSourceMatchTest, TextSearchShouldRequireWholeDocumentAndTextScore) {
auto match = DocumentSourceMatch::create(fromjson("{$text: {$search: 'hello'} }"), getExpCtx());
- DepsTracker dependencies;
- ASSERT_EQUALS(DocumentSource::EXHAUSTIVE_ALL, match->getDependencies(&dependencies));
+ DepsTracker dependencies(DepsTracker::MetadataAvailable::kTextScore);
+ ASSERT_EQUALS(DocumentSource::EXHAUSTIVE_FIELDS, match->getDependencies(&dependencies));
ASSERT_EQUALS(true, dependencies.needWholeDocument);
- ASSERT_EQUALS(false, dependencies.getNeedTextScore());
+ ASSERT_EQUALS(true, dependencies.getNeedTextScore());
}
TEST_F(DocumentSourceMatchTest, ShouldOnlyAddOuterFieldAsDependencyOfImplicitEqualityPredicate) {
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index d53c2183443..23d413d6b60 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -39,6 +39,8 @@ using std::make_pair;
using std::string;
using std::vector;
+constexpr StringData DocumentSourceMergeCursors::kStageName;
+
DocumentSourceMergeCursors::DocumentSourceMergeCursors(
std::vector<CursorDescriptor> cursorDescriptors,
const intrusive_ptr<ExpressionContext>& pExpCtx)
@@ -48,10 +50,6 @@ REGISTER_DOCUMENT_SOURCE(mergeCursors,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceMergeCursors::createFromBson);
-const char* DocumentSourceMergeCursors::getSourceName() const {
- return "$mergeCursors";
-}
-
intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
std::vector<CursorDescriptor> cursorDescriptors,
const intrusive_ptr<ExpressionContext>& pExpCtx) {
@@ -96,7 +94,7 @@ Value DocumentSourceMergeCursors::serialize(
<< "id"
<< _cursorDescriptors[i].cursorId)));
}
- return Value(DOC(getSourceName() << Value(cursors)));
+ return Value(DOC(kStageName << Value(cursors)));
}
DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection(
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index e83316fa6ea..63521e2c742 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -37,6 +37,8 @@ namespace mongo {
class DocumentSourceMergeCursors : public DocumentSource {
public:
+ static constexpr StringData kStageName = "$mergeCursors"_sd;
+
struct CursorDescriptor {
CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId)
: connectionString(std::move(connectionString)),
@@ -48,14 +50,17 @@ public:
CursorId cursorId;
};
- // virtuals from DocumentSource
GetNextResult getNext() final;
- const char* getSourceName() const final;
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
StageConstraints constraints;
- constraints.hostRequirement = HostTypeRequirement::kAnyShardOrMongoS;
+ constraints.hostRequirement = HostTypeRequirement::kAnyShard;
constraints.requiredPosition = PositionRequirement::kFirst;
constraints.requiresInputDocSource = false;
constraints.isAllowedInsideFacetStage = false;
diff --git a/src/mongo/db/pipeline/document_source_sample.cpp b/src/mongo/db/pipeline/document_source_sample.cpp
index c174d95d935..09e3d431493 100644
--- a/src/mongo/db/pipeline/document_source_sample.cpp
+++ b/src/mongo/db/pipeline/document_source_sample.cpp
@@ -40,6 +40,8 @@
namespace mongo {
using boost::intrusive_ptr;
+constexpr StringData DocumentSourceSample::kStageName;
+
DocumentSourceSample::DocumentSourceSample(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx), _size(0) {}
@@ -47,10 +49,6 @@ REGISTER_DOCUMENT_SOURCE(sample,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceSample::createFromBson);
-const char* DocumentSourceSample::getSourceName() const {
- return "$sample";
-}
-
DocumentSource::GetNextResult DocumentSourceSample::getNext() {
if (_size == 0)
return GetNextResult::makeEOF();
@@ -84,7 +82,7 @@ DocumentSource::GetNextResult DocumentSourceSample::getNext() {
}
Value DocumentSourceSample::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- return Value(DOC(getSourceName() << DOC("size" << _size)));
+ return Value(DOC(kStageName << DOC("size" << _size)));
}
namespace {
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index 662c6a9a49d..07a85c77e33 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -35,8 +35,12 @@ namespace mongo {
class DocumentSourceSample final : public DocumentSource, public SplittableDocumentSource {
public:
+ static constexpr StringData kStageName = "$sample"_sd;
+
GetNextResult getNext() final;
- const char* getSourceName() const final;
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
StageConstraints constraints() const final {
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
index ff6c4e6ec16..b429dbba6bf 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.cpp
@@ -92,6 +92,11 @@ DocumentSource::GetNextResult DocumentSourceSampleFromRandomCursor::getNext() {
MutableDocument md(nextResult.releaseDocument());
md.setRandMetaField(_randMetaFieldVal);
+ if (pExpCtx->needsMerge) {
+ // This stage will be merged by sorting results according to this random metadata field, but
+ // the merging logic expects to sort by the sort key metadata.
+ md.setSortKeyMetaField(BSON("" << _randMetaFieldVal));
+ }
return md.freeze();
}
diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp
index 61125a879f4..cf1b99e6e93 100644
--- a/src/mongo/db/pipeline/document_source_skip.cpp
+++ b/src/mongo/db/pipeline/document_source_skip.cpp
@@ -50,9 +50,7 @@ REGISTER_DOCUMENT_SOURCE(skip,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceSkip::createFromBson);
-const char* DocumentSourceSkip::getSourceName() const {
- return "$skip";
-}
+constexpr StringData DocumentSourceSkip::kStageName;
DocumentSource::GetNextResult DocumentSourceSkip::getNext() {
pExpCtx->checkForInterrupt();
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index 8fc27a59f87..fc87d7e1eaa 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -34,9 +34,27 @@ namespace mongo {
class DocumentSourceSkip final : public DocumentSource, public SplittableDocumentSource {
public:
- // virtuals from DocumentSource
+ static constexpr StringData kStageName = "$skip"_sd;
+
+ /**
+ * Convenience method for creating a $skip stage.
+ */
+ static boost::intrusive_ptr<DocumentSourceSkip> create(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long nToSkip);
+
+ /**
+ * Parses the user-supplied BSON into a $skip stage.
+ *
+ * Throws a AssertionException if 'elem' is an invalid $skip specification.
+ */
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
GetNextResult getNext() final;
- const char* getSourceName() const final;
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
StageConstraints constraints() const final {
StageConstraints constraints;
@@ -77,20 +95,6 @@ public:
_nToSkip = newSkip;
}
- /**
- * Convenience method for creating a $skip stage.
- */
- static boost::intrusive_ptr<DocumentSourceSkip> create(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx, long long nToSkip);
-
- /**
- * Parses the user-supplied BSON into a $skip stage.
- *
- * Throws a AssertionException if 'elem' is an invalid $skip specification.
- */
- static boost::intrusive_ptr<DocumentSource> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
-
private:
explicit DocumentSourceSkip(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
long long nToSkip);
diff --git a/src/mongo/db/pipeline/document_source_sort.cpp b/src/mongo/db/pipeline/document_source_sort.cpp
index 4d78d08ccaa..fd113a9c386 100644
--- a/src/mongo/db/pipeline/document_source_sort.cpp
+++ b/src/mongo/db/pipeline/document_source_sort.cpp
@@ -47,6 +47,54 @@ using std::make_pair;
using std::string;
using std::vector;
+namespace {
+Value missingToNull(Value maybeMissing) {
+ return maybeMissing.missing() ? Value(BSONNULL) : maybeMissing;
+}
+
+/**
+ * Converts a Value representing an in-memory sort key to a BSONObj representing a serialized sort
+ * key. If 'sortPatternSize' is 1, returns a BSON object with 'value' as it's only value - and an
+ * empty field name. Otherwise asserts that 'value' is an array of length 'sortPatternSize', and
+ * returns a BSONObj with one field for each value in the array, each field using the empty field
+ * name.
+ */
+BSONObj serializeSortKey(size_t sortPatternSize, Value value) {
+ // Missing values don't serialize correctly in this format, so use nulls instead, since they are
+ // considered equivalent with woCompare().
+ if (sortPatternSize == 1) {
+ return BSON("" << missingToNull(value));
+ }
+ invariant(value.isArray());
+ invariant(value.getArrayLength() == sortPatternSize);
+ BSONObjBuilder bb;
+ for (auto&& val : value.getArray()) {
+ bb << "" << missingToNull(val);
+ }
+ return bb.obj();
+}
+
+/**
+ * Converts a BSONObj representing a serialized sort key into a Value, which we use for in-memory
+ * comparisons. BSONObj {'': 1, '': [2, 3]} becomes Value [1, [2, 3]].
+ */
+Value deserializeSortKey(size_t sortPatternSize, BSONObj bsonSortKey) {
+ vector<Value> keys;
+ keys.reserve(sortPatternSize);
+ for (auto&& elt : bsonSortKey) {
+ keys.push_back(Value{elt});
+ }
+ invariant(keys.size() == sortPatternSize);
+ if (sortPatternSize == 1) {
+ // As a special case for a sort on a single field, we do not put the keys into an array.
+ return keys[0];
+ }
+ return Value{std::move(keys)};
+}
+
+} // namespace
+constexpr StringData DocumentSourceSort::kStageName;
+
DocumentSourceSort::DocumentSourceSort(const intrusive_ptr<ExpressionContext>& pExpCtx)
: DocumentSource(pExpCtx), _mergingPresorted(false) {}
@@ -54,10 +102,6 @@ REGISTER_DOCUMENT_SOURCE(sort,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceSort::createFromBson);
-const char* DocumentSourceSort::getSourceName() const {
- return "$sort";
-}
-
DocumentSource::GetNextResult DocumentSourceSort::getNext() {
pExpCtx->checkForInterrupt();
@@ -83,17 +127,18 @@ DocumentSource::GetNextResult DocumentSourceSort::getNext() {
void DocumentSourceSort::serializeToArray(
std::vector<Value>& array, boost::optional<ExplainOptions::Verbosity> explain) const {
if (explain) { // always one Value for combined $sort + $limit
- array.push_back(Value(
- DOC(getSourceName() << DOC(
- "sortKey" << serializeSortKey(static_cast<bool>(explain)) << "mergePresorted"
- << (_mergingPresorted ? Value(true) : Value())
- << "limit"
- << (limitSrc ? Value(limitSrc->getLimit()) : Value())))));
+ array.push_back(Value(DOC(
+ kStageName << DOC("sortKey" << sortKeyPattern(SortKeySerialization::kForExplain)
+ << "mergePresorted"
+ << (_mergingPresorted ? Value(true) : Value())
+ << "limit"
+ << (limitSrc ? Value(limitSrc->getLimit()) : Value())))));
} else { // one Value for $sort and maybe a Value for $limit
- MutableDocument inner(serializeSortKey(static_cast<bool>(explain)));
- if (_mergingPresorted)
+ MutableDocument inner(sortKeyPattern(SortKeySerialization::kForPipelineSerialization));
+ if (_mergingPresorted) {
inner["$mergePresorted"] = Value(true);
- array.push_back(Value(DOC(getSourceName() << inner.freeze())));
+ }
+ array.push_back(Value(DOC(kStageName << inner.freeze())));
if (limitSrc) {
limitSrc->serializeToArray(array);
@@ -109,7 +154,7 @@ long long DocumentSourceSort::getLimit() const {
return limitSrc ? limitSrc->getLimit() : -1;
}
-Document DocumentSourceSort::serializeSortKey(bool explain) const {
+Document DocumentSourceSort::sortKeyPattern(SortKeySerialization serializationMode) const {
MutableDocument keyObj;
const size_t n = _sortPattern.size();
for (size_t i = 0; i < n; ++i) {
@@ -118,9 +163,22 @@ Document DocumentSourceSort::serializeSortKey(bool explain) const {
keyObj.setField(_sortPattern[i].fieldPath->fullPath(),
Value(_sortPattern[i].isAscending ? 1 : -1));
} else {
- // For sorts that are not simply on a field path, use a made-up field name.
- keyObj[string(str::stream() << "$computed" << i)] =
- _sortPattern[i].expression->serialize(explain);
+ // Sorting by an expression, use a made up field name.
+ auto computedFieldName = string(str::stream() << "$computed" << i);
+ switch (serializationMode) {
+ case SortKeySerialization::kForExplain:
+ case SortKeySerialization::kForPipelineSerialization: {
+ const bool isExplain = (serializationMode == SortKeySerialization::kForExplain);
+ keyObj[computedFieldName] = _sortPattern[i].expression->serialize(isExplain);
+ break;
+ }
+ case SortKeySerialization::kForSortKeyMerging: {
+ // We need to be able to tell which direction the sort is. Expression sorts are
+ // always descending.
+ keyObj[computedFieldName] = Value(-1);
+ break;
+ }
+ }
}
}
return keyObj.freeze();
@@ -149,6 +207,10 @@ DocumentSource::GetDepsReturn DocumentSourceSort::getDependencies(DepsTracker* d
deps->fields.insert(keyPart.fieldPath->fullPath());
}
}
+ if (pExpCtx->needsMerge) {
+ // Include the sort key if we will merge several sorted streams later.
+ deps->setNeedSortKey(true);
+ }
return SEE_NEXT;
}
@@ -220,9 +282,11 @@ intrusive_ptr<DocumentSourceSort> DocumentSourceSort::create(
uassert(15976, "$sort stage must have at least one sort key", !pSort->_sortPattern.empty());
- const bool isExplain = false;
- pSort->_sortKeyGen =
- SortKeyGenerator{pSort->serializeSortKey(isExplain).toBson(), pExpCtx->getCollator()};
+ pSort->_sortKeyGen = SortKeyGenerator{
+ // The SortKeyGenerator expects the expressions to be serialized in order to detect a sort
+ // by a metadata field.
+ pSort->sortKeyPattern(SortKeySerialization::kForPipelineSerialization).toBson(),
+ pExpCtx->getCollator()};
if (limit > 0) {
pSort->setLimitSrc(DocumentSourceLimit::create(pExpCtx, limit));
@@ -269,12 +333,19 @@ DocumentSource::GetNextResult DocumentSourceSort::populate() {
}
}
-void DocumentSourceSort::loadDocument(const Document& doc) {
+void DocumentSourceSort::loadDocument(Document&& doc) {
invariant(!_populated);
if (!_sorter) {
_sorter.reset(MySorter::make(makeSortOptions(), Comparator(*this)));
}
- _sorter->add(extractKey(doc), doc);
+
+ Value sortKey;
+ Document docForSorter;
+ // We always need to extract the sort key if we've reached this point. If the query system had
+ // already computed the sort key we'd have split the pipeline there, would be merging presorted
+ // documents, and wouldn't use this method.
+ std::tie(sortKey, docForSorter) = extractSortKey(std::move(doc));
+ _sorter->add(sortKey, docForSorter);
}
void DocumentSourceSort::loadingDone() {
@@ -295,8 +366,18 @@ public:
return _cursor->more();
}
Data next() {
- const Document doc = DocumentSourceMergeCursors::nextSafeFrom(_cursor);
- return make_pair(_sorter->extractKey(doc), doc);
+ auto doc = DocumentSourceMergeCursors::nextSafeFrom(_cursor);
+ if (doc.hasSortKeyMetaField()) {
+ // We set the sort key metadata field during the first half of the sort, so just use
+ // that as the sort key here.
+ return make_pair(
+ deserializeSortKey(_sorter->_sortPattern.size(), doc.getSortKeyMetaField()), doc);
+ } else {
+ // It's possible this result is coming from a shard that is still on an old version. If
+ // that's the case, it won't tell us it's sort key - we'll have to re-compute it
+ // ourselves.
+ return _sorter->extractSortKey(std::move(doc));
+ }
}
private:
@@ -344,7 +425,7 @@ StatusWith<Value> DocumentSourceSort::extractKeyFast(const Document& doc) const
return Value{std::move(keys)};
}
-Value DocumentSourceSort::extractKeyWithArray(const Document& doc) const {
+BSONObj DocumentSourceSort::extractKeyWithArray(const Document& doc) const {
SortKeyGenerator::Metadata metadata;
if (doc.hasTextScore()) {
metadata.textScore = doc.getTextScore();
@@ -356,26 +437,40 @@ Value DocumentSourceSort::extractKeyWithArray(const Document& doc) const {
// Convert the Document to a BSONObj, but only do the conversion for the paths we actually need.
// Then run the result through the SortKeyGenerator to obtain the final sort key.
auto bsonDoc = document_path_support::documentToBsonWithPaths(doc, _paths);
- auto bsonKey = uassertStatusOK(_sortKeyGen->getSortKey(std::move(bsonDoc), &metadata));
-
- // Convert BSON sort key, which is an object with empty field names, into the corresponding
- // array of keys representation as a Value. BSONObj {'': 1, '': [2, 3]} becomes Value [1, [2,
- // 3]].
- vector<Value> keys;
- keys.reserve(_sortPattern.size());
- for (auto&& elt : bsonKey) {
- keys.push_back(Value{elt});
- }
- return Value{std::move(keys)};
+ return uassertStatusOK(_sortKeyGen->getSortKey(std::move(bsonDoc), &metadata));
}
-Value DocumentSourceSort::extractKey(const Document& doc) const {
- auto key = extractKeyFast(doc);
- if (key.isOK()) {
- return key.getValue();
+std::pair<Value, Document> DocumentSourceSort::extractSortKey(Document&& doc) const {
+ boost::optional<BSONObj> serializedSortKey; // Only populated if we need to merge with other
+ // sorted results later. Serialized in the standard
+ // BSON sort key format with empty field names,
+ // e.g. {'': 1, '': [2, 3]}.
+
+ Value inMemorySortKey; // The Value we will use for comparisons within the sorter.
+
+ auto fastKey = extractKeyFast(doc);
+ if (fastKey.isOK()) {
+ inMemorySortKey = std::move(fastKey.getValue());
+ if (pExpCtx->needsMerge) {
+ serializedSortKey = serializeSortKey(_sortPattern.size(), inMemorySortKey);
+ }
+ } else {
+ // We have to do it the slow way - through the sort key generator. This will generate a BSON
+ // sort key, which is an object with empty field names. We then need to convert this BSON
+ // representation into the corresponding array of keys as a Value. BSONObj {'': 1, '': [2,
+ // 3]} becomes Value [1, [2, 3]].
+ serializedSortKey = extractKeyWithArray(doc);
+ inMemorySortKey = deserializeSortKey(_sortPattern.size(), *serializedSortKey);
}
- return extractKeyWithArray(doc);
+ MutableDocument toBeSorted(std::move(doc));
+ if (pExpCtx->needsMerge) {
+ // We need to be merged, so will have to be serialized. Save the sort key here to avoid
+ // re-computing it during the merge.
+ invariant(serializedSortKey);
+ toBeSorted.setSortKeyMetaField(*serializedSortKey);
+ }
+ return {inMemorySortKey, toBeSorted.freeze()};
}
int DocumentSourceSort::compare(const Value& lhs, const Value& rhs) const {
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index 5731051cbd3..2494400eaae 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -39,10 +39,20 @@ namespace mongo {
class DocumentSourceSort final : public DocumentSource, public SplittableDocumentSource {
public:
static const uint64_t kMaxMemoryUsageBytes = 100 * 1024 * 1024;
+ static constexpr StringData kStageName = "$sort"_sd;
+
+ enum class SortKeySerialization {
+ kForExplain,
+ kForPipelineSerialization,
+ kForSortKeyMerging,
+ };
- // virtuals from DocumentSource
GetNextResult getNext() final;
- const char* getSourceName() const final;
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
void serializeToArray(
std::vector<Value>& array,
boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
@@ -73,8 +83,10 @@ public:
boost::intrusive_ptr<DocumentSource> getShardSource() final;
boost::intrusive_ptr<DocumentSource> getMergeSource() final;
- /// Write out a Document whose contents are the sort key.
- Document serializeSortKey(bool explain) const;
+ /**
+ * Write out a Document whose contents are the sort key pattern.
+ */
+ Document sortKeyPattern(SortKeySerialization) const;
/**
* Parses a $sort stage from the user-supplied BSON.
@@ -101,7 +113,7 @@ public:
* coming from another DocumentSource. Once all documents have been added, the caller must call
* loadingDone() before using getNext() to receive the documents in sorted order.
*/
- void loadDocument(const Document& doc);
+ void loadDocument(Document&& doc);
/**
* Signals to the sort stage that there will be no more input documents. It is an error to call
@@ -179,11 +191,15 @@ private:
SortOptions makeSortOptions() const;
/**
- * Returns the sort key for 'doc' based on the SortPattern. Attempts to generate the key using a
- * fast path that does not handle arrays. If an array is encountered, falls back on
- * extractKeyWithArray().
+ * Returns the sort key for 'doc', as well as the document that should be entered into the
+ * sorter to eventually be returned. If we will need to later merge the sorted results with
+ * other results, this method adds the sort key as metadata onto 'doc' to speed up the merge
+ * later.
+ *
+ * Attempts to generate the key using a fast path that does not handle arrays. If an array is
+ * encountered, falls back on extractKeyWithArray().
*/
- Value extractKey(const Document& doc) const;
+ std::pair<Value, Document> extractSortKey(Document&& doc) const;
/**
* Returns the sort key for 'doc' based on the SortPattern, or ErrorCodes::InternalError if an
@@ -198,9 +214,10 @@ private:
StatusWith<Value> extractKeyPart(const Document& doc, const SortPatternPart& keyPart) const;
/**
- * Returns the sort key for 'doc' based on the SortPattern.
+ * Returns the sort key for 'doc' based on the SortPattern. Note this is in the BSONObj format -
+ * with empty field names.
*/
- Value extractKeyWithArray(const Document& doc) const;
+ BSONObj extractKeyWithArray(const Document& doc) const;
int compare(const Value& lhs, const Value& rhs) const;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 89975fc4cf2..c5edf02f50d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -374,10 +374,10 @@ void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipelin
void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
Pipeline* mergePipe) {
- DepsTracker mergeDeps(
- mergePipe->getDependencies(DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
- ? DepsTracker::MetadataAvailable::kTextScore
- : DepsTracker::MetadataAvailable::kNoMetadata));
+ auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
+ ? DepsTracker::MetadataAvailable::kTextScore
+ : DepsTracker::MetadataAvailable::kNoMetadata;
+ DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
if (mergeDeps.needWholeDocument)
return; // the merge needs all fields, so nothing we can do.
@@ -399,7 +399,7 @@ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipelin
// 2) Optimization IS NOT applied immediately following a $project or $group since it would
// add an unnecessary project (and therefore a deep-copy).
for (auto&& source : shardPipe->_sources) {
- DepsTracker dt;
+ DepsTracker dt(depsMetadata);
if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
return;
}
@@ -528,6 +528,9 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva
if (localDeps.getNeedTextScore())
deps.setNeedTextScore(true);
+ if (localDeps.getNeedSortKey())
+ deps.setNeedSortKey(true);
+
knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
}
@@ -552,4 +555,13 @@ DepsTracker Pipeline::getDependencies(DepsTracker::MetadataAvailable metadataAva
return deps;
}
+boost::intrusive_ptr<DocumentSource> Pipeline::popFrontStageWithName(StringData targetStageName) {
+ if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
+ return nullptr;
+ }
+ auto targetStage = _sources.front();
+ _sources.pop_front();
+ stitch();
+ return targetStage;
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 35b18954fdd..7300b8e542e 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -264,6 +264,12 @@ public:
}
/**
+ * Removes and returns the first stage of the pipeline if its name is 'targetStageName'. Returns
+ * nullptr if there is no first stage, or if the stage's name is not 'targetStageName'.
+ */
+ boost::intrusive_ptr<DocumentSource> popFrontStageWithName(StringData targetStageName);
+
+ /**
* PipelineD is a "sister" class that has additional functionality for the Pipeline. It exists
* because of linkage requirements. Pipeline needs to function in mongod and mongos. PipelineD
* contains extra functionality required in mongod, and which can't appear in mongos because the
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 42f9ea6615c..7f6a7c5fc49 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -448,6 +448,13 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
return getExecutorFind(
opCtx, collection, nss, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts);
}
+
+BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) {
+ if (!projectionObj[Document::metaFieldSortKey]) {
+ return projectionObj;
+ }
+ return projectionObj.removeField(Document::metaFieldSortKey);
+}
} // namespace
void PipelineD::prepareCursorSource(Collection* collection,
@@ -526,20 +533,18 @@ void PipelineD::prepareCursorSource(Collection* collection,
BSONObj projForQuery = deps.toProjection();
- /*
- Look for an initial sort; we'll try to add this to the
- Cursor we create. If we're successful in doing that (further down),
- we'll remove the $sort from the pipeline, because the documents
- will already come sorted in the specified order as a result of the
- index scan.
- */
+ // Look for an initial sort; we'll try to add this to the Cursor we create. If we're successful
+ // in doing that, we'll remove the $sort from the pipeline, because the documents will already
+ // come sorted in the specified order as a result of the index scan.
intrusive_ptr<DocumentSourceSort> sortStage;
BSONObj sortObj;
if (!sources.empty()) {
sortStage = dynamic_cast<DocumentSourceSort*>(sources.front().get());
if (sortStage) {
- // build the sort key
- sortObj = sortStage->serializeSortKey(/*explain*/ false).toBson();
+ sortObj = sortStage
+ ->sortKeyPattern(
+ DocumentSourceSort::SortKeySerialization::kForPipelineSerialization)
+ .toBson();
}
}
@@ -611,26 +616,30 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
plannerOpts |= QueryPlannerParams::IS_COUNT;
}
- // The only way to get a text score is to let the query system handle the projection. In all
- // other cases, unless the query system can do an index-covered projection and avoid going to
- // the raw record at all, it is faster to have ParsedDeps filter the fields we need.
- if (!deps.getNeedTextScore()) {
+ // The only way to get a text score or the sort key is to let the query system handle the
+ // projection. In all other cases, unless the query system can do an index-covered projection
+ // and avoid going to the raw record at all, it is faster to have ParsedDeps filter the fields
+ // we need.
+ if (!deps.getNeedTextScore() && !deps.getNeedSortKey()) {
plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS;
}
- BSONObj emptyProjection;
+ const BSONObj emptyProjection;
+ const BSONObj metaSortProjection = BSON("$meta"
+ << "sortKey");
if (sortStage) {
// See if the query system can provide a non-blocking sort.
- auto swExecutorSort = attemptToGetExecutor(opCtx,
- collection,
- nss,
- expCtx,
- oplogReplay,
- queryObj,
- emptyProjection,
- *sortObj,
- aggRequest,
- plannerOpts);
+ auto swExecutorSort =
+ attemptToGetExecutor(opCtx,
+ collection,
+ nss,
+ expCtx,
+ oplogReplay,
+ queryObj,
+ expCtx->needsMerge ? metaSortProjection : emptyProjection,
+ *sortObj,
+ aggRequest,
+ plannerOpts);
if (swExecutorSort.isOK()) {
// Success! Now see if the query system can also cover the projection.
@@ -682,6 +691,14 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
// Either there was no $sort stage, or the query system could not provide a non-blocking
// sort.
dassert(sortObj->isEmpty());
+ *projectionObj = removeSortKeyMetaProjection(*projectionObj);
+ if (deps.getNeedSortKey() && !deps.getNeedTextScore()) {
+ // A sort key requirement would have prevented us from being able to add this parameter
+ // before, but now we know the query system won't cover the sort, so we will be able to
+ // compute the sort key ourselves during the $sort stage, and thus don't need a query
+ // projection to do so.
+ plannerOpts |= QueryPlannerParams::NO_UNCOVERED_PROJECTIONS;
+ }
// See if the query system can cover the projection.
auto swExecutorProj = attemptToGetExecutor(opCtx,
diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp
index 5d5909d4aa5..56e9041dd23 100644
--- a/src/mongo/db/query/query_planner.cpp
+++ b/src/mongo/db/query/query_planner.cpp
@@ -107,7 +107,25 @@ string optionString(size_t options) {
ss << "INDEX_INTERSECTION ";
}
if (options & QueryPlannerParams::KEEP_MUTATIONS) {
- ss << "KEEP_MUTATIONS";
+ ss << "KEEP_MUTATIONS ";
+ }
+ if (options & QueryPlannerParams::IS_COUNT) {
+ ss << "IS_COUNT ";
+ }
+ if (options & QueryPlannerParams::SPLIT_LIMITED_SORT) {
+ ss << "SPLIT_LIMITED_SORT ";
+ }
+ if (options & QueryPlannerParams::CANNOT_TRIM_IXISECT) {
+ ss << "CANNOT_TRIM_IXISECT ";
+ }
+ if (options & QueryPlannerParams::SNAPSHOT_USE_ID) {
+ ss << "SNAPSHOT_USE_ID ";
+ }
+ if (options & QueryPlannerParams::NO_UNCOVERED_PROJECTIONS) {
+ ss << "NO_UNCOVERED_PROJECTIONS ";
+ }
+ if (options & QueryPlannerParams::GENERATE_COVERED_IXSCANS) {
+ ss << "GENERATE_COVERED_IXSCANS ";
}
return ss;
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index f2ba6b2b839..1bdbd85e850 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -332,6 +332,13 @@ BSONObj establishMergingMongosCursor(
params.mergePipeline = std::move(pipelineForMerging);
params.remotes = std::move(cursors);
+ // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
+ // size we pass here is used for getMores, so do not specify a batch size if the initial request
+ // had a batch size of 0.
+ params.batchSize = request.getBatchSize() == 0
+ ? boost::none
+ : boost::optional<long long>(request.getBatchSize());
+
auto ccc = ClusterClientCursorImpl::make(
opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
@@ -582,10 +589,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// If we reach here, we have a merge pipeline to dispatch.
invariant(pipelineForMerging);
- // We need a DocumentSourceMergeCursors regardless of whether we merge on mongoS or on a shard.
- pipelineForMerging->addInitialSource(
- DocumentSourceMergeCursors::create(parseCursors(cursors), mergeCtx));
-
// First, check whether we can merge on the mongoS.
if (pipelineForMerging->canRunOnMongos() && !internalQueryProhibitMergingOnMongoS.load()) {
// Register the new mongoS cursor, and retrieve the initial batch of results.
@@ -602,6 +605,8 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
// If we cannot merge on mongoS, establish the merge cursor on a shard.
+ pipelineForMerging->addInitialSource(
+ DocumentSourceMergeCursors::create(parseCursors(cursors), mergeCtx));
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, pipelineForMerging);
auto mergeResponse = uassertStatusOK(establishMergingShardCursor(
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 460dcf7912e..2ae87217bc8 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -32,11 +32,11 @@ env.Library(
env.Library(
target="router_exec_stage",
source=[
- "router_stage_aggregation_merge.cpp",
"router_stage_limit.cpp",
"router_stage_merge.cpp",
"router_stage_mock.cpp",
- "router_stage_remove_sortkey.cpp",
+ "router_stage_pipeline.cpp",
+ "router_stage_remove_metadata_fields.cpp",
"router_stage_skip.cpp",
],
LIBDEPS=[
@@ -48,7 +48,7 @@ env.CppUnitTest(
target="router_exec_stage_test",
source=[
"router_stage_limit_test.cpp",
- "router_stage_remove_sortkey_test.cpp",
+ "router_stage_remove_metadata_fields_test.cpp",
"router_stage_skip_test.cpp",
],
LIBDEPS=[
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 19281785be0..50886944bb2 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -61,7 +61,9 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
_mergeQueue(MergingComparator(_remotes, _params->sort)) {
size_t remoteIndex = 0;
for (const auto& remote : _params->remotes) {
- _remotes.emplace_back(remote.hostAndPort, remote.cursorResponse.getCursorId());
+ _remotes.emplace_back(remote.hostAndPort,
+ remote.cursorResponse.getNSS(),
+ remote.cursorResponse.getCursorId());
// We don't check the return value of addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
@@ -269,7 +271,7 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
adjustedBatchSize = *_params->batchSize - remote.fetchedCount;
}
- BSONObj cmdObj = GetMoreRequest(_params->nsString,
+ BSONObj cmdObj = GetMoreRequest(remote.cursorNss,
remote.cursorId,
adjustedBatchSize,
_awaitDataTimeout,
@@ -582,8 +584,11 @@ executor::TaskExecutor::EventHandle AsyncResultsMerger::kill(OperationContext* o
//
AsyncResultsMerger::RemoteCursorData::RemoteCursorData(HostAndPort hostAndPort,
+ NamespaceString cursorNss,
CursorId establishedCursorId)
- : cursorId(establishedCursorId), shardHostAndPort(std::move(hostAndPort)) {}
+ : cursorId(establishedCursorId),
+ cursorNss(std::move(cursorNss)),
+ shardHostAndPort(std::move(hostAndPort)) {}
const HostAndPort& AsyncResultsMerger::RemoteCursorData::getTargetHost() const {
return shardHostAndPort;
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 04262309a99..c6ec2a26052 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -202,7 +202,9 @@ private:
* reported from the remote.
*/
struct RemoteCursorData {
- RemoteCursorData(HostAndPort hostAndPort, CursorId establishedCursorId);
+ RemoteCursorData(HostAndPort hostAndPort,
+ NamespaceString cursorNss,
+ CursorId establishedCursorId);
/**
* Returns the resolved host and port on which the remote cursor resides.
@@ -230,6 +232,10 @@ private:
// member will be set to zero.
CursorId cursorId;
+ // The namespace this cursor belongs to - note this may be different than the namespace of
+ // the operation if there is a view.
+ NamespaceString cursorNss;
+
// The exact host in the shard on which the cursor resides.
HostAndPort shardHostAndPort;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index e7716355c0f..f286cee408e 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -26,17 +26,18 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
-
#include "mongo/platform/basic.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
-#include "mongo/s/query/router_stage_aggregation_merge.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_skip.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/s/query/router_stage_limit.h"
#include "mongo/s/query/router_stage_merge.h"
#include "mongo/s/query/router_stage_mock.h"
-#include "mongo/s/query/router_stage_remove_sortkey.h"
+#include "mongo/s/query/router_stage_pipeline.h"
+#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/s/query/router_stage_skip.h"
#include "mongo/stdx/memory.h"
@@ -140,18 +141,87 @@ boost::optional<LogicalSessionId> ClusterClientCursorImpl::getLsid() const {
return _lsid;
}
+namespace {
+
+/**
+ * Rips off an initial $sort stage that will be handled by mongos execution machinery. Returns the
+ * sort key pattern of such a $sort stage if there was one, and boost::none otherwise.
+ */
+boost::optional<BSONObj> extractLeadingSort(Pipeline* mergePipeline) {
+ if (auto frontSort = mergePipeline->popFrontStageWithName(DocumentSourceSort::kStageName)) {
+ auto sortStage = static_cast<DocumentSourceSort*>(frontSort.get());
+ if (auto sortLimit = sortStage->getLimitSrc()) {
+ // There was a limit stage absorbed into the sort stage, so we need to preserve that.
+ mergePipeline->addInitialSource(sortLimit);
+ }
+ return sortStage
+ ->sortKeyPattern(DocumentSourceSort::SortKeySerialization::kForSortKeyMerging)
+ .toBson();
+ }
+ return boost::none;
+}
+
+bool isSkipOrLimit(const boost::intrusive_ptr<DocumentSource>& stage) {
+ return (dynamic_cast<DocumentSourceLimit*>(stage.get()) ||
+ dynamic_cast<DocumentSourceSkip*>(stage.get()));
+}
+
+bool isAllLimitsAndSkips(Pipeline* pipeline) {
+ const auto stages = pipeline->getSources();
+ return std::all_of(
+ stages.begin(), stages.end(), [&](const auto& stage) { return isSkipOrLimit(stage); });
+}
+
+std::unique_ptr<RouterExecStage> buildPipelinePlan(executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
+ invariant(params->mergePipeline);
+ invariant(!params->skip);
+ invariant(!params->limit);
+ auto* pipeline = params->mergePipeline.get();
+ auto* opCtx = pipeline->getContext()->opCtx;
+
+ std::unique_ptr<RouterExecStage> root =
+ stdx::make_unique<RouterStageMerge>(opCtx, executor, params);
+ if (!isAllLimitsAndSkips(pipeline)) {
+ return stdx::make_unique<RouterStagePipeline>(std::move(root),
+ std::move(params->mergePipeline));
+ }
+
+ // After extracting an optional leading $sort, the pipeline consisted entirely of $skip and
+ // $limit stages. Avoid creating a RouterStagePipeline (which will go through an expensive
+ // conversion from BSONObj -> Document for each result), and create a RouterExecStage tree
+ // instead.
+ while (!pipeline->getSources().empty()) {
+ invariant(isSkipOrLimit(pipeline->getSources().front()));
+ if (auto skip = pipeline->popFrontStageWithName(DocumentSourceSkip::kStageName)) {
+ root = stdx::make_unique<RouterStageSkip>(
+ opCtx, std::move(root), static_cast<DocumentSourceSkip*>(skip.get())->getSkip());
+ } else if (auto limit = pipeline->popFrontStageWithName(DocumentSourceLimit::kStageName)) {
+ root = stdx::make_unique<RouterStageLimit>(
+ opCtx, std::move(root), static_cast<DocumentSourceLimit*>(limit.get())->getLimit());
+ }
+ }
+ if (!params->sort.isEmpty()) {
+ // We are executing the pipeline without using a Pipeline, so we need to strip out any
+ // Document metadata ourselves. Note we only need this stage if there was a sort, since
+ // otherwise there would be no way for this half of the pipeline to require any metadata
+ // fields.
+ root = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(root), Document::allMetadataFieldNames);
+ }
+ return root;
+}
+} // namespace
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
OperationContext* opCtx, executor::TaskExecutor* executor, ClusterClientCursorParams* params) {
const auto skip = params->skip;
const auto limit = params->limit;
- const bool hasSort = !params->sort.isEmpty();
-
- // The first stage always merges from the remotes. If 'mergePipeline' has been specified in
- // ClusterClientCursorParams, then RouterStageAggregationMerge should be the root and only node.
- // Otherwise, construct a RouterStage pipeline from the remotes, skip, limit, and sort fields in
- // 'params'.
if (params->mergePipeline) {
- return stdx::make_unique<RouterStageAggregationMerge>(std::move(params->mergePipeline));
+ if (auto sort = extractLeadingSort(params->mergePipeline.get())) {
+ params->sort = *sort;
+ }
+ return buildPipelinePlan(executor, params);
}
std::unique_ptr<RouterExecStage> root =
@@ -165,8 +235,13 @@ std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
root = stdx::make_unique<RouterStageLimit>(opCtx, std::move(root), *limit);
}
+ const bool hasSort = !params->sort.isEmpty();
if (hasSort) {
- root = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(root));
+ // Strip out the sort key after sorting.
+ root = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx,
+ std::move(root),
+ std::vector<StringData>{ClusterClientCursorParams::kSortKeyField});
}
return root;
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index ac074d92b62..e7d8ebb65b9 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -77,12 +77,18 @@ public:
* currently attached. This is so that a killing thread may call this method with its own
* operation context.
*/
- virtual void kill(OperationContext* opCtx) = 0;
+ virtual void kill(OperationContext* opCtx) {
+ invariant(_child); // The default implementation forwards to the child stage.
+ _child->kill(opCtx);
+ }
/**
* Returns whether or not all the remote cursors are exhausted.
*/
- virtual bool remotesExhausted() = 0;
+ virtual bool remotesExhausted() {
+ invariant(_child); // The default implementation forwards to the child stage.
+ return _child->remotesExhausted();
+ }
/**
* Sets the maxTimeMS value that the cursor should forward with any internally issued getMore
@@ -91,7 +97,15 @@ public:
* Returns a non-OK status if this cursor type does not support maxTimeMS on getMore (i.e. if
* the cursor is not tailable + awaitData).
*/
- virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ if (_child) {
+ auto childStatus = _child->setAwaitDataTimeout(awaitDataTimeout);
+ if (!childStatus.isOK()) {
+ return childStatus;
+ }
+ }
+ return doSetAwaitDataTimeout(awaitDataTimeout);
+ }
/**
* Sets the current operation context to be used by the router stage.
@@ -135,6 +149,13 @@ protected:
virtual void doDetachFromOperationContext() {}
/**
+ * Performs any stage-specific await data timeout actions.
+ */
+ virtual Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return Status::OK();
+ }
+
+ /**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
*/
RouterExecStage* getChildStage() {
diff --git a/src/mongo/s/query/router_stage_aggregation_merge.cpp b/src/mongo/s/query/router_stage_aggregation_merge.cpp
deleted file mode 100644
index a4273dbd4a7..00000000000
--- a/src/mongo/s/query/router_stage_aggregation_merge.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/router_stage_aggregation_merge.h"
-
-#include "mongo/db/pipeline/document_source_merge_cursors.h"
-#include "mongo/db/pipeline/expression_context.h"
-
-namespace mongo {
-
-RouterStageAggregationMerge::RouterStageAggregationMerge(
- std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline)
- : RouterExecStage(mergePipeline->getContext()->opCtx),
- _mergePipeline(std::move(mergePipeline)) {}
-
-StatusWith<ClusterQueryResult> RouterStageAggregationMerge::next() {
- // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
- if (auto result = _mergePipeline->getNext()) {
- return {result->toBson()};
- }
-
- // If we reach this point, we have hit EOF.
- _mergePipeline.get_deleter().dismissDisposal();
- _mergePipeline->dispose(getOpCtx());
-
- return {ClusterQueryResult()};
-}
-
-void RouterStageAggregationMerge::doReattachToOperationContext() {
- _mergePipeline->reattachToOperationContext(getOpCtx());
-}
-
-void RouterStageAggregationMerge::doDetachFromOperationContext() {
- _mergePipeline->detachFromOperationContext();
-}
-
-void RouterStageAggregationMerge::kill(OperationContext* opCtx) {
- _mergePipeline.get_deleter().dismissDisposal();
- _mergePipeline->dispose(opCtx);
-}
-
-bool RouterStageAggregationMerge::remotesExhausted() {
- const auto mergeSource =
- static_cast<DocumentSourceMergeCursors*>(_mergePipeline->getSources().front().get());
- return mergeSource->remotesExhausted();
-}
-
-Status RouterStageAggregationMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"};
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index feb8f11626f..b3fd2b14651 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -57,16 +57,4 @@ StatusWith<ClusterQueryResult> RouterStageLimit::next() {
return childResult;
}
-void RouterStageLimit::kill(OperationContext* opCtx) {
- getChildStage()->kill(opCtx);
-}
-
-bool RouterStageLimit::remotesExhausted() {
- return getChildStage()->remotesExhausted();
-}
-
-Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 42ef46c21ab..1a158e2c3a7 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -43,12 +43,6 @@ public:
StatusWith<ClusterQueryResult> next() final;
- void kill(OperationContext* opCtx) final;
-
- bool remotesExhausted() final;
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
private:
long long _limit;
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 78ee1a3475a..f2a159003c2 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -69,7 +69,7 @@ bool RouterStageMerge::remotesExhausted() {
return _arm.remotesExhausted();
}
-Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+Status RouterStageMerge::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 23503c664f6..78c5383e0ee 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -53,7 +53,8 @@ public:
bool remotesExhausted() final;
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+protected:
+ Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
protected:
void doReattachToOperationContext() override {
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index edeb1f9945c..7ebc3a6a554 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -68,7 +68,7 @@ bool RouterStageMock::remotesExhausted() {
return _remotesExhausted;
}
-Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+Status RouterStageMock::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
_awaitDataTimeout = awaitDataTimeout;
return Status::OK();
}
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index e2f8e7adab5..8e3075103d5 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -51,8 +51,6 @@ public:
bool remotesExhausted() final;
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
/**
* Queues a BSONObj to be returned.
*/
@@ -79,6 +77,9 @@ public:
*/
StatusWith<Milliseconds> getAwaitDataTimeout();
+protected:
+ Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
private:
std::queue<StatusWith<ClusterQueryResult>> _resultsQueue;
bool _remotesExhausted = false;
diff --git a/src/mongo/s/query/router_stage_pipeline.cpp b/src/mongo/s/query/router_stage_pipeline.cpp
new file mode 100644
index 00000000000..d9cf02f85c3
--- /dev/null
+++ b/src/mongo/s/query/router_stage_pipeline.cpp
@@ -0,0 +1,135 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/query/router_stage_pipeline.h"
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
+#include "mongo/db/pipeline/expression_context.h"
+
+namespace mongo {
+
+namespace {
+
+/**
+ * A class that acts as an adapter between the RouterExecStage and DocumentSource interfaces,
+ * translating results from an input RouterExecStage into DocumentSource::GetNextResults.
+ */
+class DocumentSourceRouterAdapter : public DocumentSource {
+public:
+ static boost::intrusive_ptr<DocumentSourceRouterAdapter> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<RouterExecStage> childStage) {
+ return new DocumentSourceRouterAdapter(expCtx, std::move(childStage));
+ }
+
+ GetNextResult getNext() final {
+ auto next = uassertStatusOK(_child->next());
+ if (auto nextObj = next.getResult()) {
+ return Document::fromBsonWithMetaData(*nextObj);
+ }
+ return GetNextResult::makeEOF();
+ }
+
+ void doDispose() final {
+ _child->kill(pExpCtx->opCtx);
+ }
+
+ void reattachToOperationContext(OperationContext* opCtx) final {
+ _child->reattachToOperationContext(opCtx);
+ }
+
+ void detachFromOperationContext() final {
+ _child->detachFromOperationContext();
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
+ invariant(explain); // We shouldn't need to serialize this stage to send it anywhere.
+ return Value(); // Return the empty value to hide this stage from explain output.
+ }
+
+ bool remotesExhausted() {
+ return _child->remotesExhausted();
+ }
+
+private:
+ DocumentSourceRouterAdapter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<RouterExecStage> childStage)
+ : DocumentSource(expCtx), _child(std::move(childStage)) {}
+
+ std::unique_ptr<RouterExecStage> _child;
+};
+} // namespace
+
+RouterStagePipeline::RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
+ std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline)
+ : RouterExecStage(mergePipeline->getContext()->opCtx),
+ _mergePipeline(std::move(mergePipeline)) {
+ // Add an adapter to the front of the pipeline to draw results from 'child'.
+ _mergePipeline->addInitialSource(
+ DocumentSourceRouterAdapter::create(_mergePipeline->getContext(), std::move(child)));
+}
+
+StatusWith<ClusterQueryResult> RouterStagePipeline::next() {
+ // Pipeline::getNext will return a boost::optional<Document> or boost::none if EOF.
+ if (auto result = _mergePipeline->getNext()) {
+ return {result->toBson()};
+ }
+
+ // If we reach this point, we have hit EOF.
+ _mergePipeline.get_deleter().dismissDisposal();
+ _mergePipeline->dispose(getOpCtx());
+
+ return {ClusterQueryResult()};
+}
+
+void RouterStagePipeline::doReattachToOperationContext() {
+ _mergePipeline->reattachToOperationContext(getOpCtx());
+}
+
+void RouterStagePipeline::doDetachFromOperationContext() {
+ _mergePipeline->detachFromOperationContext();
+}
+
+void RouterStagePipeline::kill(OperationContext* opCtx) {
+ _mergePipeline.get_deleter().dismissDisposal();
+ _mergePipeline->dispose(opCtx);
+}
+
+bool RouterStagePipeline::remotesExhausted() {
+ return static_cast<DocumentSourceRouterAdapter*>(_mergePipeline->getSources().front().get())
+ ->remotesExhausted();
+}
+
+Status RouterStagePipeline::doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) {
+ return {ErrorCodes::InvalidOptions, "maxTimeMS is not valid for aggregation getMore"};
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_aggregation_merge.h b/src/mongo/s/query/router_stage_pipeline.h
index 363b46e73d9..780f1fe0e47 100644
--- a/src/mongo/s/query/router_stage_aggregation_merge.h
+++ b/src/mongo/s/query/router_stage_pipeline.h
@@ -35,12 +35,13 @@
namespace mongo {
/**
- * Draws results from a Pipeline with a DocumentSourceMergeCursors at its head, which is the
- * underlying source of the stream of merged documents manipulated by the RouterStage pipeline.
+ * Inserts a pipeline into the router execution tree, drawing results from the input stage, feeding
+ * them through the pipeline, and outputting the results of the pipeline.
*/
-class RouterStageAggregationMerge final : public RouterExecStage {
+class RouterStagePipeline final : public RouterExecStage {
public:
- RouterStageAggregationMerge(std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline);
+ RouterStagePipeline(std::unique_ptr<RouterExecStage> child,
+ std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline);
StatusWith<ClusterQueryResult> next() final;
@@ -48,9 +49,9 @@ public:
bool remotesExhausted() final;
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
protected:
+ Status doSetAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
void doReattachToOperationContext() final;
void doDetachFromOperationContext() final;
diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields.cpp b/src/mongo/s/query/router_stage_remove_metadata_fields.cpp
new file mode 100644
index 00000000000..3be98380e4e
--- /dev/null
+++ b/src/mongo/s/query/router_stage_remove_metadata_fields.cpp
@@ -0,0 +1,89 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <algorithm>
+
+#include "mongo/s/query/router_stage_remove_metadata_fields.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/document.h"
+
+namespace mongo {
+
+RouterStageRemoveMetadataFields::RouterStageRemoveMetadataFields(
+ OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child,
+ std::vector<StringData> metadataFields)
+ : RouterExecStage(opCtx, std::move(child)), _metaFields(std::move(metadataFields)) {
+ for (auto&& fieldName : _metaFields) {
+ invariant(fieldName[0] == '$'); // We use this information to optimize next().
+ }
+}
+
+StatusWith<ClusterQueryResult> RouterStageRemoveMetadataFields::next() {
+ auto childResult = getChildStage()->next();
+ if (!childResult.isOK() || !childResult.getValue().getResult()) {
+ return childResult;
+ }
+
+ BSONObjIterator iterator(*childResult.getValue().getResult());
+ // Find the first field that we need to remove.
+ while (iterator.more() && (*iterator).fieldName()[0] != '$' &&
+ std::find(_metaFields.begin(), _metaFields.end(), (*iterator).fieldNameStringData()) ==
+ _metaFields.end()) {
+ ++iterator;
+ }
+
+ if (!iterator.more()) {
+ // We got all the way to the end without finding any fields to remove, just return the whole
+ // document.
+ return childResult;
+ }
+
+ // Copy everything up to the first metadata field.
+ const auto firstElementBufferStart =
+ childResult.getValue().getResult()->firstElement().rawdata();
+ auto endOfNonMetaFieldBuffer = (*iterator).rawdata();
+ BSONObjBuilder builder;
+ builder.bb().appendBuf(firstElementBufferStart,
+ endOfNonMetaFieldBuffer - firstElementBufferStart);
+
+ // Copy any remaining fields that are not metadata. We expect metadata fields are likely to be
+ // at the end of the document, so there is likely nothing else to copy.
+ while ((++iterator).more()) {
+ if (std::find(_metaFields.begin(), _metaFields.end(), (*iterator).fieldNameStringData()) ==
+ _metaFields.end()) {
+ builder.append(*iterator);
+ }
+ }
+ return {builder.obj()};
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_metadata_fields.h b/src/mongo/s/query/router_stage_remove_metadata_fields.h
new file mode 100644
index 00000000000..07c9c7c36bb
--- /dev/null
+++ b/src/mongo/s/query/router_stage_remove_metadata_fields.h
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/s/query/router_exec_stage.h"
+
+namespace mongo {
+
+/**
+ * Removes metadata fields from a BSON object.
+ */
+class RouterStageRemoveMetadataFields final : public RouterExecStage {
+public:
+ RouterStageRemoveMetadataFields(OperationContext* opCtx,
+ std::unique_ptr<RouterExecStage> child,
+ std::vector<StringData> fieldsToRemove);
+
+ StatusWith<ClusterQueryResult> next() final;
+
+private:
+ // Use a StringMap so we can look up by StringData - avoiding a string allocation on each field
+ // in each object. The value here is meaningless.
+ std::vector<StringData> _metaFields;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp b/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp
index 5767549ad64..bb8ea4613b8 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey_test.cpp
+++ b/src/mongo/s/query/router_stage_remove_metadata_fields_test.cpp
@@ -1,37 +1,38 @@
/**
- * Copyright 2015 MongoDB Inc.
+ * Copyright (C) 2017 MongoDB Inc.
*
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
*
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
*
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
*
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
-#include "mongo/s/query/router_stage_remove_sortkey.h"
+#include "mongo/s/query/router_stage_remove_metadata_fields.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/document.h"
#include "mongo/s/query/router_stage_mock.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
@@ -44,15 +45,20 @@ namespace {
// going through the trouble of making one, we'll just use nullptr throughout.
OperationContext* opCtx = nullptr;
-TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
+TEST(RouterStageRemoveMetadataFieldsTest, RemovesMetaDataFields) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 4 << "$sortKey" << 1 << "b" << 3));
mockStage->queueResult(BSON("$sortKey" << BSON("" << 3) << "c" << BSON("d"
<< "foo")));
mockStage->queueResult(BSON("a" << 3));
+ mockStage->queueResult(BSON("a" << 3 << "$randVal" << 4 << "$sortKey" << 2));
+ mockStage->queueResult(
+ BSON("$textScore" << 2 << "a" << 3 << "$randVal" << 4 << "$sortKey" << 2));
+ mockStage->queueResult(BSON("$textScore" << 2));
mockStage->queueResult(BSONObj());
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), Document::allMetadataFieldNames);
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -74,19 +80,35 @@ TEST(RouterStageRemoveSortKeyTest, RemovesSortKey) {
auto fourthResult = sortKeyStage->next();
ASSERT_OK(fourthResult.getStatus());
ASSERT(fourthResult.getValue().getResult());
- ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSONObj());
+ ASSERT_BSONOBJ_EQ(*fourthResult.getValue().getResult(), BSON("a" << 3));
auto fifthResult = sortKeyStage->next();
ASSERT_OK(fifthResult.getStatus());
- ASSERT(fifthResult.getValue().isEOF());
+ ASSERT(fifthResult.getValue().getResult());
+ ASSERT_BSONOBJ_EQ(*fifthResult.getValue().getResult(), BSON("a" << 3));
+
+ auto sixthResult = sortKeyStage->next();
+ ASSERT_OK(sixthResult.getStatus());
+ ASSERT(sixthResult.getValue().getResult());
+ ASSERT_BSONOBJ_EQ(*sixthResult.getValue().getResult(), BSONObj());
+
+ auto seventhResult = sortKeyStage->next();
+ ASSERT_OK(seventhResult.getStatus());
+ ASSERT(seventhResult.getValue().getResult());
+ ASSERT_BSONOBJ_EQ(*seventhResult.getValue().getResult(), BSONObj());
+
+ auto eighthResult = sortKeyStage->next();
+ ASSERT_OK(eighthResult.getStatus());
+ ASSERT(eighthResult.getValue().isEOF());
}
-TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
+TEST(RouterStageRemoveMetadataFieldsTest, PropagatesError) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("$sortKey" << 1));
mockStage->queueError(Status(ErrorCodes::BadValue, "bad thing happened"));
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd});
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -99,13 +121,14 @@ TEST(RouterStageRemoveSortKeyTest, PropagatesError) {
ASSERT_EQ(secondResult.getStatus().reason(), "bad thing happened");
}
-TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
+TEST(RouterStageRemoveMetadataFieldsTest, ToleratesMidStreamEOF) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1));
mockStage->queueEOF();
mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2));
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd});
auto firstResult = sortKeyStage->next();
ASSERT_OK(firstResult.getStatus());
@@ -126,13 +149,14 @@ TEST(RouterStageRemoveSortKeyTest, ToleratesMidStreamEOF) {
ASSERT(fourthResult.getValue().isEOF());
}
-TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
+TEST(RouterStageRemoveMetadataFieldsTest, RemotesExhausted) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
mockStage->queueResult(BSON("a" << 1 << "$sortKey" << 1 << "b" << 1));
mockStage->queueResult(BSON("a" << 2 << "$sortKey" << 1 << "b" << 2));
mockStage->markRemotesExhausted();
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd});
ASSERT_TRUE(sortKeyStage->remotesExhausted());
auto firstResult = sortKeyStage->next();
@@ -153,12 +177,13 @@ TEST(RouterStageRemoveSortKeyTest, RemotesExhausted) {
ASSERT_TRUE(sortKeyStage->remotesExhausted());
}
-TEST(RouterStageRemoveSortKeyTest, ForwardsAwaitDataTimeout) {
+TEST(RouterStageRemoveMetadataFieldsTest, ForwardsAwaitDataTimeout) {
auto mockStage = stdx::make_unique<RouterStageMock>(opCtx);
auto mockStagePtr = mockStage.get();
ASSERT_NOT_OK(mockStage->getAwaitDataTimeout().getStatus());
- auto sortKeyStage = stdx::make_unique<RouterStageRemoveSortKey>(opCtx, std::move(mockStage));
+ auto sortKeyStage = stdx::make_unique<RouterStageRemoveMetadataFields>(
+ opCtx, std::move(mockStage), std::vector<StringData>{"$sortKey"_sd});
ASSERT_OK(sortKeyStage->setAwaitDataTimeout(Milliseconds(789)));
auto awaitDataTimeout = mockStagePtr->getAwaitDataTimeout();
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
deleted file mode 100644
index fe7a8cf0f7d..00000000000
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/query/router_stage_remove_sortkey.h"
-
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/s/query/cluster_client_cursor_params.h"
-#include "mongo/util/mongoutils/str.h"
-
-namespace mongo {
-
-RouterStageRemoveSortKey::RouterStageRemoveSortKey(OperationContext* opCtx,
- std::unique_ptr<RouterExecStage> child)
- : RouterExecStage(opCtx, std::move(child)) {}
-
-StatusWith<ClusterQueryResult> RouterStageRemoveSortKey::next() {
- auto childResult = getChildStage()->next();
- if (!childResult.isOK() || !childResult.getValue().getResult()) {
- return childResult;
- }
-
- const auto& childObj = childResult.getValue().getResult();
-
- BSONObjBuilder builder;
- for (BSONElement elt : *childObj) {
- if (!str::equals(elt.fieldName(), ClusterClientCursorParams::kSortKeyField)) {
- builder.append(elt);
- }
- }
-
- return {builder.obj()};
-}
-
-void RouterStageRemoveSortKey::kill(OperationContext* opCtx) {
- getChildStage()->kill(opCtx);
-}
-
-bool RouterStageRemoveSortKey::remotesExhausted() {
- return getChildStage()->remotesExhausted();
-}
-
-Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
deleted file mode 100644
index ba71364dfa9..00000000000
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/s/query/router_exec_stage.h"
-
-namespace mongo {
-
-/**
- * Removes the sort key added to each document by mongod's sortKey meta-projection.
- *
- * Only needed if the query specifies a sort.
- */
-class RouterStageRemoveSortKey final : public RouterExecStage {
-public:
- RouterStageRemoveSortKey(OperationContext* opCtx, std::unique_ptr<RouterExecStage> child);
-
- StatusWith<ClusterQueryResult> next() final;
-
- void kill(OperationContext* opCtx) final;
-
- bool remotesExhausted() final;
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-};
-
-} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 50d2107b14c..b514731c9cd 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -58,16 +58,4 @@ StatusWith<ClusterQueryResult> RouterStageSkip::next() {
return getChildStage()->next();
}
-void RouterStageSkip::kill(OperationContext* opCtx) {
- getChildStage()->kill(opCtx);
-}
-
-bool RouterStageSkip::remotesExhausted() {
- return getChildStage()->remotesExhausted();
-}
-
-Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
- return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
-}
-
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index 49051128577..9e67d25b74d 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -43,12 +43,6 @@ public:
StatusWith<ClusterQueryResult> next() final;
- void kill(OperationContext* opCtx) final;
-
- bool remotesExhausted() final;
-
- Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
-
private:
long long _skip;
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index dce282c5892..506ac226636 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -63,10 +63,11 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
ClusterClientCursorParams params(
incomingCursorResponse.getValue().getNSS(),
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames());
- params.remotes.emplace_back(
- shardId,
- server,
- CursorResponse(requestedNss, incomingCursorResponse.getValue().getCursorId(), {}));
+ params.remotes.emplace_back(shardId,
+ server,
+ CursorResponse(incomingCursorResponse.getValue().getNSS(),
+ incomingCursorResponse.getValue().getCursorId(),
+ {}));
auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));