diff options
-rw-r--r-- | jstests/aggregation/sources/documents.js (renamed from jstests/aggregation/documents.js) | 129 | ||||
-rw-r--r-- | jstests/aggregation/sources/unionWith/unionWith_allows_stages.js | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_documents.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_queue.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_queue.h | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_replace_root.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_replace_root.h | 20 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_union_with.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/sharded_agg_helpers.cpp | 21 |
14 files changed, 184 insertions, 125 deletions
diff --git a/jstests/aggregation/documents.js b/jstests/aggregation/sources/documents.js index 302b850ec02..ed90e9ac7aa 100644 --- a/jstests/aggregation/documents.js +++ b/jstests/aggregation/sources/documents.js @@ -2,51 +2,28 @@ // The $documents follows these rules: // * $documents must be in the beginning of the pipeline, // * $documents content must evaluate into an array of objects. -// $documents is not meant to be used in sharded env yet. It is going to return -// the same result set for each shard which is counter intuitive. The test is disabled -// for mongos // @tags: [ -// do_not_wrap_aggregations_in_facets, -// assumes_unsharded_collection, -// assumes_read_preference_unchanged, -// assumes_read_concern_unchanged, -// assumes_against_mongod_not_mongos +// do_not_wrap_aggregations_in_facets // ] - (function() { "use strict"; load("jstests/aggregation/extras/utils.js"); // For resultsEq. const dbName = jsTestName(); -// TODO SERVER-59097 - expose $documents and get rid of internal -// client here -const writeConcernOptions = { - writeConcern: {w: "majority"} -}; - -const testInternalClient = (function createInternalClient() { - const connInternal = new Mongo(db.getMongo().host); - const curDB = connInternal.getDB(dbName); - assert.commandWorked(curDB.runCommand({ - "hello": 1, - internalClient: {minWireVersion: NumberInt(0), maxWireVersion: NumberInt(7)} - })); - return connInternal; -})(); -const currDB = testInternalClient.getDB(dbName); +const currDB = db.getSiblingDB(dbName); const coll = currDB.documents; -coll.drop(writeConcernOptions); -coll.insert({a: 1}, writeConcernOptions); +coll.drop(); +assert.commandWorked(coll.insert({a: 1})); const lookup_coll = currDB.lookup_coll; -lookup_coll.drop(writeConcernOptions); +lookup_coll.drop(); for (let i = 0; i < 10; i++) { - lookup_coll.insert({id_name: i, name: "name_" + i}, writeConcernOptions); + assert.commandWorked(lookup_coll.insert({id_name: i, name: "name_" + i})); } // $documents given an array of objects. -const docs = coll.aggregate([{$documents: [{a1: 1}, {a1: 2}]}], writeConcernOptions).toArray(); +const docs = currDB.aggregate([{$documents: [{a1: 1}, {a1: 2}]}]).toArray(); assert.eq(2, docs.length); assert.eq(docs[0], {a1: 1}); @@ -54,8 +31,7 @@ assert.eq(docs[1], {a1: 2}); // $documents evaluates to an array of objects. const docs1 = - coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}], - writeConcernOptions) + currDB.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]) .toArray(); assert.eq(100, docs1.length); @@ -65,30 +41,27 @@ for (let i = 0; i < 100; i++) { // $documents evaluates to an array of objects. const docsUnionWith = - coll.aggregate( - [ - {$documents: [{a: 13}]}, - { - $unionWith: { - pipeline: - [{$documents: {$map: {input: {$range: [0, 5]}, in : {x: "$$this"}}}}] - } + coll.aggregate([ + { + $unionWith: { + pipeline: [{$documents: {$map: {input: {$range: [0, 5]}, in : {x: "$$this"}}}}] } - ], - writeConcernOptions) + }, + {$group: {_id: "$x", x: {$first: "$x"}}}, + {$project: {_id: 0}}, + ]) .toArray(); -assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWith)); +assert(resultsEq([{x: null}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWith)); { // $documents with const objects inside $unionWith (no "coll"). - const res = coll.aggregate( - [ - {$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}}, - {$project: {_id: 0}} - ], - writeConcernOptions) + const res = coll.aggregate([ + {$unionWith: {pipeline: [{$documents: [{xx: 1}, {xx: 2}]}]}}, + {$group: {_id: "$xx", xx: {$first: "$xx"}}}, + {$project: {_id: 0}} + ]) .toArray(); - assert(resultsEq([{a: 1}, {xx: 1}, {xx: 2}], res)); + assert(resultsEq([{xx: null}, {xx: 1}, {xx: 2}], res)); } { // $documents with const objects inside $lookup (no "coll", explicit $match). @@ -114,8 +87,8 @@ assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWit }, {$match: {"names": {"$ne": []}}}, {$project: {_id: 0}} - ], - writeConcernOptions) + ] + ) .toArray(); assert(resultsEq( [ @@ -139,8 +112,7 @@ assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWit }, {$match: {"names": {"$ne": []}}}, {$project: {_id: 0}} - ], - writeConcernOptions) + ]) .toArray(); assert(resultsEq( [ @@ -150,39 +122,52 @@ assert(resultsEq([{a: 13}, {x: 0}, {x: 1}, {x: 2}, {x: 3}, {x: 4}], docsUnionWit res)); } +// Must fail when $document appears in the top level collection pipeline. +assert.throwsWithCode(() => { + coll.aggregate([{$documents: {$map: {input: {$range: [0, 100]}, in : {x: "$$this"}}}}]); +}, ErrorCodes.InvalidNamespace); + // Must fail due to misplaced $document. assert.throwsWithCode(() => { - coll.aggregate([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}], - writeConcernOptions); + coll.aggregate([{$project: {a: [{xx: 1}, {xx: 2}]}}, {$documents: [{a: 1}]}]); }, 40602); // $unionWith must fail due to no $document assert.throwsWithCode(() => { - coll.aggregate([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], - writeConcernOptions); -}, 9); + coll.aggregate([{$unionWith: {pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}]); +}, ErrorCodes.FailedToParse); // Test that $lookup fails due to no 'from' argument and no $documents stage. assert.throwsWithCode(() => { - coll.aggregate([{$lookup: {let: {"id_lookup": "$id_name"}, as: "aa", pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}]}}], - writeConcernOptions); -}, 9); + coll.aggregate([ + { + $lookup: { + let: {"id_lookup": "$id_name"}, + as: "aa", + pipeline: [{$project: {a: [{xx: 1}, {xx: 2}]}}] + } + } + ]); +}, ErrorCodes.FailedToParse); // Test that $lookup fails due to no 'from' argument and no pipeline field. assert.throwsWithCode(() => { - coll.aggregate([{$lookup: {let : {"id_lookup": "$id_name"}, as: "aa"}}], writeConcernOptions); -}, 9); -// Must fail due to $documents producing array of non-objects. + coll.aggregate([{$lookup: {let : {"id_lookup": "$id_name"}, as: "aa"}}]); +}, ErrorCodes.FailedToParse); + +// Test that $documents fails due to producing array of non-objects. assert.throwsWithCode(() => { - coll.aggregate([{$documents: [1, 2, 3]}], writeConcernOptions); + currDB.aggregate([{$documents: [1, 2, 3]}]); }, 40228); - -// Must fail due $documents producing non-array. +// Now with one object and one scalar. assert.throwsWithCode(() => { - coll.aggregate([{$documents: {a: 1}}], writeConcernOptions); -}, 5858203); + currDB.aggregate([{$documents: [{a: 1}, 2]}]); +}, 40228); -// Must fail due $documents producing array of non-objects. +// Test that $documents fails due when provided a non-array. assert.throwsWithCode(() => { - coll.aggregate([{$documents: {a: [1, 2, 3]}}], writeConcernOptions); + currDB.aggregate([{$documents: "string"}]); }, 5858203); + +// Test that $documents succeeds when given a singleton object. +assert.eq(currDB.aggregate([{$documents: [{a: [1, 2, 3]}]}]).toArray(), [{a: [1, 2, 3]}]); })(); diff --git a/jstests/aggregation/sources/unionWith/unionWith_allows_stages.js b/jstests/aggregation/sources/unionWith/unionWith_allows_stages.js index 7a5e88eb82f..a7c8b6b372b 100644 --- a/jstests/aggregation/sources/unionWith/unionWith_allows_stages.js +++ b/jstests/aggregation/sources/unionWith/unionWith_allows_stages.js @@ -24,6 +24,7 @@ function buildErrorString(expected, found) { return "Expected:\n" + tojson(expected) + "\nGot:\n" + tojson(found); } function checkResults(resObj, expectedResult) { + assert.commandWorked(resObj); assert(arrayEq(resObj.cursor.firstBatch, expectedResult), buildErrorString(expectedResult, resObj.cursor.firstBatch)); } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index cf28715eec8..15ace1abe31 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -400,7 +400,7 @@ public: * Shortcut method to get a BSONObj for debugging. Often useful in log messages, but is not * cheap so avoid doing so on a hot path at a low verbosity. */ - BSONObj serializeToBSONForDebug() const; + virtual BSONObj serializeToBSONForDebug() const; /** * If this stage uses additional namespaces, adds them to 'collectionNames'. These namespaces diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h index 27889466373..248bed940c1 100644 --- a/src/mongo/db/pipeline/document_source_cursor.h +++ b/src/mongo/db/pipeline/document_source_cursor.h @@ -125,6 +125,12 @@ public: return _exec->getPlanExplainer().getVersion(); } + BSONObj serializeToBSONForDebug() const final { + // Feel free to add any useful information here. For now this has not been useful for + // debugging so is left empty. + return BSON(kStageName << "{}"); + } + protected: DocumentSourceCursor(const CollectionPtr& collection, std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec, diff --git a/src/mongo/db/pipeline/document_source_documents.cpp b/src/mongo/db/pipeline/document_source_documents.cpp index ed0f7a2fe9f..5d4e02c71ea 100644 --- a/src/mongo/db/pipeline/document_source_documents.cpp +++ b/src/mongo/db/pipeline/document_source_documents.cpp @@ -43,10 +43,10 @@ namespace mongo { using boost::intrusive_ptr; -REGISTER_INTERNAL_DOCUMENT_SOURCE(documents, - LiteParsedDocumentSourceDefault::parse, - DocumentSourceDocuments::createFromBson, - true); +REGISTER_DOCUMENT_SOURCE(documents, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceDocuments::createFromBson, + AllowedWithApiStrict::kAlways); std::list<intrusive_ptr<DocumentSource>> DocumentSourceDocuments::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { @@ -54,7 +54,7 @@ std::list<intrusive_ptr<DocumentSource>> DocumentSourceDocuments::createFromBson // $unwind, and $replaceRoot together. auto genField = UUID::gen().toString(); auto projectContent = BSON(genField << elem); - auto queue = DocumentSourceQueue::create(expCtx); + auto queue = DocumentSourceQueue::create(expCtx, DocumentSourceDocuments::kStageName); queue->emplace_back(Document{}); /* Create the following pipeline from $documents: [...] * => [ queue([{}]), @@ -66,9 +66,10 @@ std::list<intrusive_ptr<DocumentSource>> DocumentSourceDocuments::createFromBson queue, DocumentSourceProject::create(projectContent, expCtx, elem.fieldNameStringData()), DocumentSourceUnwind::create(expCtx, genField, false, {}, true), - DocumentSourceReplaceRoot::createFromBson( - BSON("$replaceRoot" << BSON("newRoot" << std::string("$") + genField)).firstElement(), - expCtx)}; + DocumentSourceReplaceRoot::create(expCtx, + ExpressionFieldPath::createPathFromString( + expCtx.get(), genField, expCtx->variablesParseState), + "elements within the array passed to $documents")}; } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_queue.cpp b/src/mongo/db/pipeline/document_source_queue.cpp index eb921a5faf4..c724db5ae10 100644 --- a/src/mongo/db/pipeline/document_source_queue.cpp +++ b/src/mongo/db/pipeline/document_source_queue.cpp @@ -30,7 +30,6 @@ #include "mongo/platform/basic.h" #include "mongo/db/pipeline/document_source_queue.h" - namespace mongo { REGISTER_INTERNAL_DOCUMENT_SOURCE(queue, @@ -55,16 +54,20 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceQueue::createFromBson( } boost::intrusive_ptr<DocumentSourceQueue> DocumentSourceQueue::create( - const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return new DocumentSourceQueue({}, expCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<StringData> aliasStageName) { + return new DocumentSourceQueue({}, expCtx, aliasStageName); } DocumentSourceQueue::DocumentSourceQueue(std::deque<GetNextResult> results, - const boost::intrusive_ptr<ExpressionContext>& expCtx) - : DocumentSource(kStageName, expCtx), _queue(std::move(results)) {} + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<StringData> aliasStageName) + : DocumentSource(kStageName /* pass the real stage name here for execution stats */, expCtx), + _queue(std::move(results)), + _aliasStageName(std::move(aliasStageName)) {} const char* DocumentSourceQueue::getSourceName() const { - return kStageName.rawData(); + return _aliasStageName.value_or(kStageName).rawData(); } DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() { @@ -80,7 +83,7 @@ DocumentSource::GetNextResult DocumentSourceQueue::doGetNext() { Value DocumentSourceQueue::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { ValueArrayStream vals; for (auto elem : _queue) { - vals << elem.getDocument(); + vals << elem.getDocument().getOwned(); } return Value(DOC(kStageName << vals.done())); } diff --git a/src/mongo/db/pipeline/document_source_queue.h b/src/mongo/db/pipeline/document_source_queue.h index f24b0c875e4..b0387475298 100644 --- a/src/mongo/db/pipeline/document_source_queue.h +++ b/src/mongo/db/pipeline/document_source_queue.h @@ -32,7 +32,6 @@ #include <deque> #include "mongo/db/pipeline/document_source.h" - namespace mongo { /** @@ -46,10 +45,12 @@ public: static constexpr StringData kStageName = "$queue"_sd; static boost::intrusive_ptr<DocumentSourceQueue> create( - const boost::intrusive_ptr<ExpressionContext>& expCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<StringData> aliasStageName = boost::none); DocumentSourceQueue(std::deque<GetNextResult> results, - const boost::intrusive_ptr<ExpressionContext>& expCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<StringData> aliasStageName = boost::none); ~DocumentSourceQueue() override = default; const char* getSourceName() const override; @@ -59,7 +60,7 @@ public: StageConstraints constraints(Pipeline::SplitState pipeState) const override { StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kFirst, - HostTypeRequirement::kNone, + HostTypeRequirement::kLocalOnly, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kAllowed, @@ -67,6 +68,7 @@ public: UnionRequirement::kAllowed); constraints.requiresInputDocSource = false; + constraints.isIndependentOfAnyCollection = true; return constraints; } @@ -101,6 +103,10 @@ protected: GetNextResult doGetNext() override; // Return documents from front of queue. std::deque<GetNextResult> _queue; + + // An optional alias name is provided for cases like $documents where we want an error message + // to indicate the name the user provided, not the internal $queue name. + boost::optional<StringData> _aliasStageName = boost::none; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp index 137791a4ca7..fb4bb2cb951 100644 --- a/src/mongo/db/pipeline/document_source_replace_root.cpp +++ b/src/mongo/db/pipeline/document_source_replace_root.cpp @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source_replace_root.h" #include <boost/smart_ptr/intrusive_ptr.hpp> +#include <fmt/format.h> #include "mongo/db/exec/document_value/value.h" #include "mongo/db/jsobj.h" @@ -45,27 +46,13 @@ using boost::intrusive_ptr; Document ReplaceRootTransformation::applyTransformation(const Document& input) { // Extract subdocument in the form of a Value. Value newRoot = _newRoot->evaluate(input, &_expCtx->variables); - - // To ensure an accurate user-facing message, any user-facing syntax that uses this stage - // internally must provide an message opener that complies with its documentation. - StringData msgOpener = [&]() { - switch (_specifiedName) { - case UserSpecifiedName::kReplaceRoot: - return "'newRoot' expression "_sd; - case UserSpecifiedName::kReplaceWith: - return "'replacement document' "_sd; - default: - MONGO_UNREACHABLE; - } - }(); - // The newRoot expression, if it exists, must evaluate to an object. uassert(40228, - str::stream() << msgOpener.toString() - << "must evaluate to an object, but resulting value was: " - << newRoot.toString() << ". Type of resulting value: '" - << typeName(newRoot.getType()) - << "'. Input document: " << input.toString(), + fmt::format(kErrorTemplate.rawData(), + _errMsgContextForNonObject, + newRoot.toString(), + typeName(newRoot.getType()), + input.toString()), newRoot.getType() == BSONType::Object); // Turn the value into a document. @@ -119,10 +106,21 @@ intrusive_ptr<DocumentSource> DocumentSourceReplaceRoot::createFromBson( std::make_unique<ReplaceRootTransformation>( expCtx, newRootExpression, - (stageName == kStageName) ? ReplaceRootTransformation::UserSpecifiedName::kReplaceRoot - : ReplaceRootTransformation::UserSpecifiedName::kReplaceWith), + (stageName == kStageName) ? "'newRoot' expression " : "'replacement document' "), kStageName.rawData(), isIndependentOfAnyCollection); } +boost::intrusive_ptr<DocumentSource> DocumentSourceReplaceRoot::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<Expression>& newRootExpression, + std::string errMsgContextForNonObjects) { + const bool isIndependentOfAnyCollection = false; + return new DocumentSourceSingleDocumentTransformation( + expCtx, + std::make_unique<ReplaceRootTransformation>( + expCtx, newRootExpression, std::move(errMsgContextForNonObjects)), + kStageName.rawData(), + isIndependentOfAnyCollection); +} } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_replace_root.h b/src/mongo/db/pipeline/document_source_replace_root.h index e0d8482d554..e302ce5d88c 100644 --- a/src/mongo/db/pipeline/document_source_replace_root.h +++ b/src/mongo/db/pipeline/document_source_replace_root.h @@ -44,8 +44,10 @@ public: ReplaceRootTransformation(const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::intrusive_ptr<Expression> newRootExpression, - UserSpecifiedName specifiedName) - : _expCtx(expCtx), _newRoot(std::move(newRootExpression)), _specifiedName(specifiedName) {} + std::string errMsgContextForNonObject) + : _expCtx(expCtx), + _newRoot(std::move(newRootExpression)), + _errMsgContextForNonObject(std::move(errMsgContextForNonObject)) {} TransformerType getType() const final { return TransformerType::kReplaceRoot; @@ -82,7 +84,14 @@ public: private: const boost::intrusive_ptr<ExpressionContext> _expCtx; boost::intrusive_ptr<Expression> _newRoot; - UserSpecifiedName _specifiedName; + + // A string for additional context for the user about where/why we were expecting an object. + // This can be helpful if you are using $replaceRoot as part of an alias expansion as we do in + // $documents for example. Goes first in the template error message below. + std::string _errMsgContextForNonObject; + static constexpr StringData kErrorTemplate = + "{} must evaluate to an object, but resulting value was: {}. Type of resulting value: " + "'{}'. Input document: {}"_sd; }; /* @@ -106,6 +115,11 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + static boost::intrusive_ptr<DocumentSource> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::intrusive_ptr<Expression>& newRootExpression, + std::string errMsgContextForNonObjects); + private: // It is illegal to construct a DocumentSourceReplaceRoot directly, use createFromBson() // instead. diff --git a/src/mongo/db/pipeline/document_source_union_with.cpp b/src/mongo/db/pipeline/document_source_union_with.cpp index c948c1f57e7..7d1e4ea5568 100644 --- a/src/mongo/db/pipeline/document_source_union_with.cpp +++ b/src/mongo/db/pipeline/document_source_union_with.cpp @@ -26,6 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery #include "mongo/platform/basic.h" @@ -35,6 +36,7 @@ #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/pipeline/document_source_documents.h" #include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_queue.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_source_union_with.h" #include "mongo/db/pipeline/document_source_union_with_gen.h" @@ -85,12 +87,24 @@ DocumentSourceUnionWith::~DocumentSourceUnionWith() { void validateUnionWithCollectionlessPipeline( const boost::optional<std::vector<mongo::BSONObj>>& pipeline) { + const auto errMsg = + "$unionWith stage without explicit collection must have a pipeline with $documents as " + "first stage"; + + uassert(ErrorCodes::FailedToParse, errMsg, pipeline && pipeline->size() > 0); + const auto firstStageBson = (*pipeline)[0]; + LOGV2_DEBUG(5909700, + 4, + "$unionWith validating collectionless pipeline", + "pipeline"_attr = pipeline, + "first"_attr = firstStageBson); uassert(ErrorCodes::FailedToParse, - "$unionWith stage without explicit collection must have a pipeline with $documents as " - "first stage", - pipeline && pipeline->size() > 0 && - // TODO SERVER-59628 replace with constraints check - !(*pipeline)[0].getField(DocumentSourceDocuments::kStageName).eoo()); + errMsg, + // TODO SERVER-59628 replace with constraints check + (firstStageBson.hasField(DocumentSourceDocuments::kStageName) || + firstStageBson.hasField(DocumentSourceQueue::kStageName)) + + ); } boost::intrusive_ptr<DocumentSource> DocumentSourceUnionWith::clone() const { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index f04c4c1bd23..486b2e3ac7d 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -27,7 +27,10 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + #include "mongo/db/pipeline/pipeline.h" +#include "mongo/logv2/log.h" #include <algorithm> diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 4936c95df07..dd52df4ab9e 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -272,6 +272,11 @@ public: std::vector<Value> serialize() const; std::vector<BSONObj> serializeToBson() const; + /** + * Serializes the pipeline into BSON for explain/debug logging purposes. + */ + std::vector<BSONObj> serializeToBSONForDebug() const; + // The initial source is special since it varies between mongos and mongod. void addInitialSource(boost::intrusive_ptr<DocumentSource> source); diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index e5e0131eda3..dd600dd1456 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -298,8 +298,14 @@ CommonMongodProcessInterface::attachCursorSourceToPipelineForLocalRead(Pipeline* std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); + boost::optional<DocumentSource*> firstStage = pipeline->getSources().empty() + ? boost::optional<DocumentSource*>{} + : pipeline->getSources().front().get(); + invariant(!firstStage || !dynamic_cast<DocumentSourceCursor*>(*firstStage)); + if (firstStage && !(*firstStage)->constraints().requiresInputDocSource) { + // There's no need to attach a cursor here. + return pipeline; + } boost::optional<AutoGetCollectionForReadCommandMaybeLockFree> autoColl; const NamespaceStringOrUUID nsOrUUID = expCtx->uuid diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 288c59814bf..3e268b59067 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1273,8 +1273,25 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( auto expCtx = ownedPipeline->getContext(); std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + boost::optional<DocumentSource*> hasFirstStage = pipeline->getSources().empty() + ? boost::optional<DocumentSource*>{} + : pipeline->getSources().front().get(); + + if (hasFirstStage) { + // Make sure the first stage isn't already a $mergeCursors, and also check if it is a stage + // which needs to actually get a cursor attached or not. + const auto* firstStage = *hasFirstStage; + invariant(!dynamic_cast<const DocumentSourceMergeCursors*>(firstStage)); + // Here we check the hostRequirment because there is at least one stage ($indexStats) which + // does not require input data, but is still expected to fan out and contact remote shards + // nonetheless. + if (auto constraints = firstStage->constraints(); !constraints.requiresInputDocSource && + (constraints.hostRequirement == StageConstraints::HostTypeRequirement::kLocalOnly)) { + // There's no need to attach a cursor here - the first stage provides its own data and + // is meant to be run locally (e.g. $documents). + return pipeline; + } + } // Helper to decide whether we should ignore the given shardTargetingPolicy for this namespace. // Certain namespaces are shard-local; that is, they exist independently on every shard. For |