summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2021-10-04 14:27:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-04 15:04:08 +0000
commite7563044ca02b1d889a57ed0cb35bfe6082210bc (patch)
treebc740561955ce2783be5f50f8dcea6265127e1f4
parent392f32b9a911858a8b50bab0ec4be3ae9126d680 (diff)
downloadmongo-e7563044ca02b1d889a57ed0cb35bfe6082210bc.tar.gz
SERVER-59097 Expose $documents as an externally visible stage
-rw-r--r--jstests/aggregation/sources/documents.js (renamed from jstests/aggregation/documents.js)129
-rw-r--r--jstests/aggregation/sources/unionWith/unionWith_allows_stages.js1
-rw-r--r--src/mongo/db/pipeline/document_source.h2
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h6
-rw-r--r--src/mongo/db/pipeline/document_source_documents.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_queue.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_queue.h14
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root.cpp40
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root.h20
-rw-r--r--src/mongo/db/pipeline/document_source_union_with.cpp24
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp3
-rw-r--r--src/mongo/db/pipeline/pipeline.h5
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp10
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp21
14 files changed, 184 insertions, 125 deletions
diff --git a/jstests/aggregation/documents.js b/jstests/aggregation/sources/documents.js
index 302b850ec02..ed90e9ac7aa 100644
--- a/jstests/aggregation/documents.js
+++ b/jstests/aggregation/sources/documents.js
@@ -2,51 +2,28 @@
// The $documents follows these rules:
// * $documents must be in the beginning of the pipeline,
// * $documents content must evaluate into an array of objects.
-// $documents is not meant to be used in sharded env yet. It is going to return
-// the same result set for each shard which is counter intuitive. The test is disabled
-// for mongos
// @tags: [
-// do_not_wrap_aggregations_in_facets,
-// assumes_unsharded_collection,
-// assumes_read_preference_unchanged,
-// assumes_read_concern_unchanged,
-// assumes_against_mongod_not_mongos
+// do_not_wrap_aggregations_in_facets
// ]
-
(function() {
"use strict";
load("jstests/aggregation/extras/utils.js"); // For resultsEq.
const dbName = jsTestName();
-// TODO SERVER-59097 - expose $documents and get rid of internal
-// client here
-const writeConcernOptions = {
- writeConcern: {w: "majority"}
-};
-
-const testInternalClient = (function createInternalClient() {
- const connInternal = new Mongo(db.getMongo().host);
- const curDB = connInternal.getDB(dbName);
- assert.commandWorked(curDB.runCommand({
- "hello": 1,
- internalClient: {minWireVersion: NumberInt(0), maxWireVersion: NumberInt(7)}
- }));
- return connInternal;
-})();
-const currDB = testInternalClient.getDB(dbName);
+const currDB = db.getSiblingDB(dbName);
const coll = currDB.documents;
-coll.drop(writeConcernOptions);
-coll.insert({a: 1}, writeConcernOptions);
+coll.drop();
+assert.commandWorked(coll.insert({a: 1}));
const lookup_coll = currDB.lookup_coll;
-lookup_coll.drop(writeConcernOptions);
+lookup_coll.drop();
for (let i = 0; i < 10; i++) {
- lookup_coll.insert({id_name: i, name: "name_" + i}, writeConcernOptions);
+ assert.commandWorked(lookup_coll.insert({id_name: i, name: "name_" + i}));
}
// $documents given an array of objects.
-const docs = coll.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray();
+const docs = currDB.aggregate([{$documents: [{a1: 1}, {a1: 2}]}]).toArray();
assert.eq(2, docs.length);
assert.eq(docs[0], {a1: 1});
@@ -54,8 +31,7 @@ assert.eq(docs[1], {a1: 2});
// $documents evaluates to an array of objects.
const docs1 =
- coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}],
- writeConcernOptions)
+ currDB.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}])
.toArray();
assert.eq(100, docs1.length);
@@ -65,30 +41,27 @@ for (let i = 0; i < 100; i++) {
// $documents evaluates to an array of objects.
const docsUnionWith =
- coll.aggregate(
- [
- {$documents: [{a: 13}]},
- {
- $unionWith: {
- pipeline:
- [{$documents: {$map: {input: {$range: [0, 5]}, in : {x: "$$this"}}}}]
- }
+ coll.aggregate([
+ {
+ $unionWith: {
+ pipeline: [{$documents: {$map: {input: {$range: [0, 5]}, in : {x: "$$this"}}}}]
}
- ],
- writeConcernOptions)
+ },
+ {$group: {_id: "$x", x: {$first: "$x"}}},
+ {$project: {_id: 0}},
+ ])
.toArray();
-assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWith));
+assert(resultsEq([{x: null}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWith));
{ // $documents with const objects inside $unionWith (no "coll").
- const res = coll.aggregate(
- [
- {$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}},
- {$project: {_id: 0}}
- ],
- writeConcernOptions)
+ const res = coll.aggregate([
+ {$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}},
+ {$group: {_id: "$xx", xx: {$first: "$xx"}}},
+ {$project: {_id: 0}}
+ ])
.toArray();
- assert(resultsEq([{a: 1}, {xx: 1}, {xx: 2}], res));
+ assert(resultsEq([{xx: null}, {xx: 1}, {xx: 2}], res));
}
{ // $documents with const objects inside $lookup (no "coll", explicit $match).
@@ -114,8 +87,8 @@ assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWit
},
{$match: {"names": {"$ne": []}}},
{$project: {_id: 0}}
- ],
- writeConcernOptions)
+ ]
+ )
.toArray();
assert(resultsEq(
[
@@ -139,8 +112,7 @@ assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWit
},
{$match: {"names": {"$ne": []}}},
{$project: {_id: 0}}
- ],
- writeConcernOptions)
+ ])
.toArray();
assert(resultsEq(
[
@@ -150,39 +122,52 @@ assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWit
res));
}
+// Must fail when $document appears in the top level collection pipeline.
+assert.throwsWithCode(() => {
+ coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]);
+}, ErrorCodes.InvalidNamespace);
+
// Must fail due to misplaced $document.
assert.throwsWithCode(() => {
- coll.aggregate([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}],
- writeConcernOptions);
+ coll.aggregate([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}]);
}, 40602);
// $unionWith must fail due to no $document
assert.throwsWithCode(() => {
- coll.aggregate([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}],
- writeConcernOptions);
-}, 9);
+ coll.aggregate([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}]);
+}, ErrorCodes.FailedToParse);
// Test that $lookup fails due to no 'from' argument and no $documents stage.
assert.throwsWithCode(() => {
- coll.aggregate([{$lookup: {let: {"id_lookup": "$id_name"}, as: "aa", pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}],
- writeConcernOptions);
-}, 9);
+ coll.aggregate([
+ {
+ $lookup: {
+ let: {"id_lookup": "$id_name"},
+ as: "aa",
+ pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]
+ }
+ }
+ ]);
+}, ErrorCodes.FailedToParse);
// Test that $lookup fails due to no 'from' argument and no pipeline field.
assert.throwsWithCode(() => {
- coll.aggregate([{$lookup: {let : {"id_lookup": "$id_name"}, as: "aa"}}], writeConcernOptions);
-}, 9);
-// Must fail due to $documents producing array of non-objects.
+ coll.aggregate([{$lookup: {let : {"id_lookup": "$id_name"}, as: "aa"}}]);
+}, ErrorCodes.FailedToParse);
+
+// Test that $documents fails due to producing array of non-objects.
assert.throwsWithCode(() => {
- coll.aggregate([{$documents: [1, 2, 3]}], writeConcernOptions);
+ currDB.aggregate([{$documents: [1, 2, 3]}]);
}, 40228);
-
-// Must fail due $documents producing non-array.
+// Now with one object and one scalar.
assert.throwsWithCode(() => {
- coll.aggregate([{$documents: {a: 1}}], writeConcernOptions);
-}, 5858203);
+ currDB.aggregate([{$documents: [{a: 1}, 2]}]);
+}, 40228);
-// Must fail due $documents producing array of non-objects.
+// Test that $documents fails due when provided a non-array.
assert.throwsWithCode(() => {
- coll.aggregate([{$documents: {a: [1, 2, 3]}}], writeConcernOptions);
+ currDB.aggregate([{$documents: "string"}]);
}, 5858203);
+
+// Test that $documents succeeds when given a singleton object.
+assert.eq(currDB.aggregate([{$documents: [{a: [1, 2, 3]}]}]).toArray(), [{a: [1, 2, 3]}]);
})();
diff --git a/jstests/aggregation/sources/unionWith/unionWith_allows_stages.js b/jstests/aggregation/sources/unionWith/unionWith_allows_stages.js
index 7a5e88eb82f..a7c8b6b372b 100644
--- a/jstests/aggregation/sources/unionWith/unionWith_allows_stages.js
+++ b/jstests/aggregation/sources/unionWith/unionWith_allows_stages.js
@@ -24,6 +24,7 @@ function buildErrorString(expected, found) {
return "Expected:\n" + tojson(expected) + "\nGot:\n" + tojson(found);
}
function checkResults(resObj, expectedResult) {
+ assert.commandWorked(resObj);
assert(arrayEq(resObj.cursor.firstBatch, expectedResult),
buildErrorString(expectedResult, resObj.cursor.firstBatch));
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index cf28715eec8..15ace1abe31 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -400,7 +400,7 @@ public:
* Shortcut method to get a BSONObj for debugging. Often useful in log messages, but is not
* cheap so avoid doing so on a hot path at a low verbosity.
*/
- BSONObj serializeToBSONForDebug() const;
+ virtual BSONObj serializeToBSONForDebug() const;
/**
* If this stage uses additional namespaces, adds them to 'collectionNames'. These namespaces
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 27889466373..248bed940c1 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -125,6 +125,12 @@ public:
return _exec->getPlanExplainer().getVersion();
}
+ BSONObj serializeToBSONForDebug() const final {
+ // Feel free to add any useful information here. For now this has not been useful for
+ // debugging so is left empty.
+ return BSON(kStageName << "{}");
+ }
+
protected:
DocumentSourceCursor(const CollectionPtr& collection,
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec,
diff --git a/src/mongo/db/pipeline/document_source_documents.cpp b/src/mongo/db/pipeline/document_source_documents.cpp
index ed0f7a2fe9f..5d4e02c71ea 100644
--- a/src/mongo/db/pipeline/document_source_documents.cpp
+++ b/src/mongo/db/pipeline/document_source_documents.cpp
@@ -43,10 +43,10 @@ namespace mongo {
using boost::intrusive_ptr;
-REGISTER_INTERNAL_DOCUMENT_SOURCE(documents,
- LiteParsedDocumentSourceDefault::parse,
- DocumentSourceDocuments::createFromBson,
- true);
+REGISTER_DOCUMENT_SOURCE(documents,
+ LiteParsedDocumentSourceDefault::parse,
+ DocumentSourceDocuments::createFromBson,
+ AllowedWithApiStrict::kAlways);
std::list<intrusive_ptr<DocumentSource>> DocumentSourceDocuments::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
@@ -54,7 +54,7 @@ std::list<intrusive_ptr<DocumentSource>> DocumentSourceDocuments::createFromBson
// $unwind, and $replaceRoot together.
auto genField = UUID::gen().toString();
auto projectContent = BSON(genField << elem);
- auto queue = DocumentSourceQueue::create(expCtx);
+ auto queue = DocumentSourceQueue::create(expCtx, DocumentSourceDocuments::kStageName);
queue->emplace_back(Document{});
/* Create the following pipeline from $documents: [...]
* => [ queue([{}]),
@@ -66,9 +66,10 @@ std::list<intrusive_ptr<DocumentSource>> DocumentSourceDocuments::createFromBson
queue,
DocumentSourceProject::create(projectContent, expCtx, elem.fieldNameStringData()),
DocumentSourceUnwind::create(expCtx, genField, false, {}, true),
- DocumentSourceReplaceRoot::createFromBson(
- BSON("$replaceRoot" << BSON("newRoot" << std::string("$") + genField)).firstElement(),
- expCtx)};
+ DocumentSourceReplaceRoot::create(expCtx,
+ ExpressionFieldPath::createPathFromString(
+ expCtx.get(), genField, expCtx->variablesParseState),
+ "elements within the array passed to $documents")};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp
index eb921a5faf4..c724db5ae10 100644
--- a/src/mongo/db/pipeline/document_source_queue.cpp
+++ b/src/mongo/db/pipeline/document_source_queue.cpp
@@ -30,7 +30,6 @@
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_queue.h"
-
namespace mongo {
REGISTER_INTERNAL_DOCUMENT_SOURCE(queue,
@@ -55,16 +54,20 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceQueue::createFromBson(
}
boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceQueue({}, expCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<StringData> aliasStageName) {
+ return new DocumentSourceQueue({}, expCtx, aliasStageName);
}
DocumentSourceQueue::DocumentSourceQueue(std::deque<GetNextResult> results,
- const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(kStageName, expCtx), _queue(std::move(results)) {}
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<StringData> aliasStageName)
+ : DocumentSource(kStageName /* pass the real stage name here for execution stats */, expCtx),
+ _queue(std::move(results)),
+ _aliasStageName(std::move(aliasStageName)) {}
const char* DocumentSourceQueue::getSourceName() const {
- return kStageName.rawData();
+ return _aliasStageName.value_or(kStageName).rawData();
}
DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() {
@@ -80,7 +83,7 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() {
Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
ValueArrayStream vals;
for (auto elem : _queue) {
- vals << elem.getDocument();
+ vals << elem.getDocument().getOwned();
}
return Value(DOC(kStageName << vals.done()));
}
diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h
index f24b0c875e4..b0387475298 100644
--- a/src/mongo/db/pipeline/document_source_queue.h
+++ b/src/mongo/db/pipeline/document_source_queue.h
@@ -32,7 +32,6 @@
#include <deque>
#include "mongo/db/pipeline/document_source.h"
-
namespace mongo {
/**
@@ -46,10 +45,12 @@ public:
static constexpr StringData kStageName = "$queue"_sd;
static boost::intrusive_ptr<DocumentSourceQueue> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<StringData> aliasStageName = boost::none);
DocumentSourceQueue(std::deque<GetNextResult> results,
- const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ boost::optional<StringData> aliasStageName = boost::none);
~DocumentSourceQueue() override = default;
const char* getSourceName() const override;
@@ -59,7 +60,7 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const override {
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kFirst,
- HostTypeRequirement::kNone,
+ HostTypeRequirement::kLocalOnly,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kAllowed,
@@ -67,6 +68,7 @@ public:
UnionRequirement::kAllowed);
constraints.requiresInputDocSource = false;
+ constraints.isIndependentOfAnyCollection = true;
return constraints;
}
@@ -101,6 +103,10 @@ protected:
GetNextResult doGetNext() override;
// Return documents from front of queue.
std::deque<GetNextResult> _queue;
+
+ // An optional alias name is provided for cases like $documents where we want an error message
+ // to indicate the name the user provided, not the internal $queue name.
+ boost::optional<StringData> _aliasStageName = boost::none;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp
index 137791a4ca7..fb4bb2cb951 100644
--- a/src/mongo/db/pipeline/document_source_replace_root.cpp
+++ b/src/mongo/db/pipeline/document_source_replace_root.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source_replace_root.h"
#include <boost/smart_ptr/intrusive_ptr.hpp>
+#include <fmt/format.h>
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/jsobj.h"
@@ -45,27 +46,13 @@ using boost::intrusive_ptr;
Document ReplaceRootTransformation::applyTransformation(const Document& input) {
// Extract subdocument in the form of a Value.
Value newRoot = _newRoot->evaluate(input, &_expCtx->variables);
-
- // To ensure an accurate user-facing message, any user-facing syntax that uses this stage
- // internally must provide an message opener that complies with its documentation.
- StringData msgOpener = [&]() {
- switch (_specifiedName) {
- case UserSpecifiedName::kReplaceRoot:
- return "'newRoot' expression "_sd;
- case UserSpecifiedName::kReplaceWith:
- return "'replacement document' "_sd;
- default:
- MONGO_UNREACHABLE;
- }
- }();
-
// The newRoot expression, if it exists, must evaluate to an object.
uassert(40228,
- str::stream() << msgOpener.toString()
- << "must evaluate to an object, but resulting value was: "
- << newRoot.toString() << ". Type of resulting value: '"
- << typeName(newRoot.getType())
- << "'. Input document: " << input.toString(),
+ fmt::format(kErrorTemplate.rawData(),
+ _errMsgContextForNonObject,
+ newRoot.toString(),
+ typeName(newRoot.getType()),
+ input.toString()),
newRoot.getType() == BSONType::Object);
// Turn the value into a document.
@@ -119,10 +106,21 @@ intrusive_ptr<DocumentSource> DocumentSourceReplaceRoot::createFromBson(
std::make_unique<ReplaceRootTransformation>(
expCtx,
newRootExpression,
- (stageName == kStageName) ? ReplaceRootTransformation::UserSpecifiedName::kReplaceRoot
- : ReplaceRootTransformation::UserSpecifiedName::kReplaceWith),
+ (stageName == kStageName) ? "'newRoot' expression " : "'replacement document' "),
kStageName.rawData(),
isIndependentOfAnyCollection);
}
+boost::intrusive_ptr<DocumentSource> DocumentSourceReplaceRoot::create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Expression>& newRootExpression,
+ std::string errMsgContextForNonObjects) {
+ const bool isIndependentOfAnyCollection = false;
+ return new DocumentSourceSingleDocumentTransformation(
+ expCtx,
+ std::make_unique<ReplaceRootTransformation>(
+ expCtx, newRootExpression, std::move(errMsgContextForNonObjects)),
+ kStageName.rawData(),
+ isIndependentOfAnyCollection);
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_replace_root.h b/src/mongo/db/pipeline/document_source_replace_root.h
index e0d8482d554..e302ce5d88c 100644
--- a/src/mongo/db/pipeline/document_source_replace_root.h
+++ b/src/mongo/db/pipeline/document_source_replace_root.h
@@ -44,8 +44,10 @@ public:
ReplaceRootTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::intrusive_ptr<Expression> newRootExpression,
- UserSpecifiedName specifiedName)
- : _expCtx(expCtx), _newRoot(std::move(newRootExpression)), _specifiedName(specifiedName) {}
+ std::string errMsgContextForNonObject)
+ : _expCtx(expCtx),
+ _newRoot(std::move(newRootExpression)),
+ _errMsgContextForNonObject(std::move(errMsgContextForNonObject)) {}
TransformerType getType() const final {
return TransformerType::kReplaceRoot;
@@ -82,7 +84,14 @@ public:
private:
const boost::intrusive_ptr<ExpressionContext> _expCtx;
boost::intrusive_ptr<Expression> _newRoot;
- UserSpecifiedName _specifiedName;
+
+ // A string for additional context for the user about where/why we were expecting an object.
+ // This can be helpful if you are using $replaceRoot as part of an alias expansion as we do in
+ // $documents for example. Goes first in the template error message below.
+ std::string _errMsgContextForNonObject;
+ static constexpr StringData kErrorTemplate =
+ "{} must evaluate to an object, but resulting value was: {}. Type of resulting value: "
+ "'{}'. Input document: {}"_sd;
};
/*
@@ -106,6 +115,11 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
+ static boost::intrusive_ptr<DocumentSource> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::intrusive_ptr<Expression>& newRootExpression,
+ std::string errMsgContextForNonObjects);
+
private:
// It is illegal to construct a DocumentSourceReplaceRoot directly, use createFromBson()
// instead.
diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp
index c948c1f57e7..7d1e4ea5568 100644
--- a/src/mongo/db/pipeline/document_source_union_with.cpp
+++ b/src/mongo/db/pipeline/document_source_union_with.cpp
@@ -26,6 +26,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
#include "mongo/platform/basic.h"
@@ -35,6 +36,7 @@
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/pipeline/document_source_documents.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_queue.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_source_union_with.h"
#include "mongo/db/pipeline/document_source_union_with_gen.h"
@@ -85,12 +87,24 @@ DocumentSourceUnionWith::~DocumentSourceUnionWith() {
void validateUnionWithCollectionlessPipeline(
const boost::optional<std::vector<mongo::BSONObj>>& pipeline) {
+ const auto errMsg =
+ "$unionWith stage without explicit collection must have a pipeline with $documents as "
+ "first stage";
+
+ uassert(ErrorCodes::FailedToParse, errMsg, pipeline && pipeline->size() > 0);
+ const auto firstStageBson = (*pipeline)[0];
+ LOGV2_DEBUG(5909700,
+ 4,
+ "$unionWith validating collectionless pipeline",
+ "pipeline"_attr = pipeline,
+ "first"_attr = firstStageBson);
uassert(ErrorCodes::FailedToParse,
- "$unionWith stage without explicit collection must have a pipeline with $documents as "
- "first stage",
- pipeline && pipeline->size() > 0 &&
- // TODO SERVER-59628 replace with constraints check
- !(*pipeline)[0].getField(DocumentSourceDocuments::kStageName).eoo());
+ errMsg,
+ // TODO SERVER-59628 replace with constraints check
+ (firstStageBson.hasField(DocumentSourceDocuments::kStageName) ||
+ firstStageBson.hasField(DocumentSourceQueue::kStageName))
+
+ );
}
boost::intrusive_ptr<DocumentSource> DocumentSourceUnionWith::clone() const {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index f04c4c1bd23..486b2e3ac7d 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -27,7 +27,10 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/logv2/log.h"
#include <algorithm>
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 4936c95df07..dd52df4ab9e 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -272,6 +272,11 @@ public:
std::vector<Value> serialize() const;
std::vector<BSONObj> serializeToBson() const;
+ /**
+ * Serializes the pipeline into BSON for explain/debug logging purposes.
+ */
+ std::vector<BSONObj> serializeToBSONForDebug() const;
+
// The initial source is special since it varies between mongos and mongod.
void addInitialSource(boost::intrusive_ptr<DocumentSource> source);
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index e5e0131eda3..dd600dd1456 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -298,8 +298,14 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline*
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
PipelineDeleter(expCtx->opCtx));
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
+ boost::optional<DocumentSource*> firstStage = pipeline->getSources().empty()
+ ? boost::optional<DocumentSource*>{}
+ : pipeline->getSources().front().get();
+ invariant(!firstStage || !dynamic_cast<DocumentSourceCursor*>(*firstStage));
+ if (firstStage && !(*firstStage)->constraints().requiresInputDocSource) {
+ // There's no need to attach a cursor here.
+ return pipeline;
+ }
boost::optional<AutoGetCollectionForReadCommandMaybeLockFree> autoColl;
const NamespaceStringOrUUID nsOrUUID = expCtx->uuid
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 288c59814bf..3e268b59067 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1273,8 +1273,25 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
auto expCtx = ownedPipeline->getContext();
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
PipelineDeleter(expCtx->opCtx));
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ boost::optional<DocumentSource*> hasFirstStage = pipeline->getSources().empty()
+ ? boost::optional<DocumentSource*>{}
+ : pipeline->getSources().front().get();
+
+ if (hasFirstStage) {
+ // Make sure the first stage isn't already a $mergeCursors, and also check if it is a stage
+ // which needs to actually get a cursor attached or not.
+ const auto* firstStage = *hasFirstStage;
+ invariant(!dynamic_cast<const DocumentSourceMergeCursors*>(firstStage));
+ // Here we check the hostRequirment because there is at least one stage ($indexStats) which
+ // does not require input data, but is still expected to fan out and contact remote shards
+ // nonetheless.
+ if (auto constraints = firstStage->constraints(); !constraints.requiresInputDocSource &&
+ (constraints.hostRequirement == StageConstraints::HostTypeRequirement::kLocalOnly)) {
+ // There's no need to attach a cursor here - the first stage provides its own data and
+ // is meant to be run locally (e.g. $documents).
+ return pipeline;
+ }
+ }
// Helper to decide whether we should ignore the given shardTargetingPolicy for this namespace.
// Certain namespaces are shard-local; that is, they exist independently on every shard. For