diff options
-rw-r--r-- | jstests/change_streams/change_stream_collation.js | 210 | ||||
-rw-r--r-- | jstests/libs/change_stream_util.js | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_test.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_match.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_match.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 41 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 17 | ||||
-rw-r--r-- | src/mongo/db/query/canonical_query.cpp | 1 |
13 files changed, 323 insertions, 80 deletions
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js index 874875bdaf6..a3ef58a14de 100644 --- a/jstests/change_streams/change_stream_collation.js +++ b/jstests/change_streams/change_stream_collation.js @@ -1,44 +1,184 @@ /** - * Test that a $changeStream pipeline adopts either the user-specified collation, or the default of - * the target collection if no specific collation is requested. - * TODO SERVER-31443: Update these tests to verify full collation support with $changeStream. + * Tests that a change stream can use a user-specified, or collection-default collation. + * + * This test assumes that it will be able to drop and then re-create a collection with non-default + * options. + * @tags: [assumes_no_implicit_collection_creation_after_drop] */ (function() { "use strict"; - const noCollationColl = db.change_stream_no_collation; - const hasCollationColl = db.change_stream_collation; + load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'. - hasCollationColl.drop(); - noCollationColl.drop(); + let cst = new ChangeStreamTest(db); + const caseInsensitive = {locale: "en_US", strength: 2}; + const caseInsensitiveCollection = db.change_stream_case_insensitive; + caseInsensitiveCollection.drop(); + + // Test that you can open a change stream before the collection exists, and it will use the + // simple collation. + const simpleCollationStream = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}], collection: caseInsensitiveCollection}); + + // Create the collection with a non-default collation - this should invalidate the stream we + // opened before it existed. + assert.commandWorked( + db.runCommand({create: caseInsensitiveCollection.getName(), collation: caseInsensitive})); + cst.assertNextChangesEqual({ + cursor: simpleCollationStream, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + const implicitCaseInsensitiveStream = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {}}, + {$match: {"fullDocument.text": "abc"}}, + // Be careful not to use _id in this projection, as startWatchingChanges() will exclude + // it by default, assuming it is the resume token. + {$project: {docId: "$documentKey._id"}} + ], + collection: caseInsensitiveCollection + }); + const explicitCaseInsensitiveStream = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {}}, + {$match: {"fullDocument.text": "abc"}}, + {$project: {docId: "$documentKey._id"}} + ], + collection: caseInsensitiveCollection, + aggregateOptions: {collation: caseInsensitive} + }); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"})); + assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"})); + + cst.assertNextChangesEqual( + {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]}); + cst.assertNextChangesEqual( + {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]}); + + // Test that the collation does not apply to the scan over the oplog. + const similarNameCollection = db.cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe; + similarNameCollection.drop(); assert.commandWorked( - db.runCommand({create: hasCollationColl.getName(), collation: {locale: "en_US"}})); - assert.commandWorked(db.runCommand({create: noCollationColl.getName()})); - - assert.writeOK(hasCollationColl.insert({_id: 1})); - assert.writeOK(noCollationColl.insert({_id: 1})); - - const csPipeline = [{$changeStream: {}}]; - const simpleCollation = {collation: {locale: "simple"}}; - const nonSimpleCollation = {collation: {locale: "en_US"}}; - - // Verify that we can open a $changeStream on a collection whose default collation is 'simple' - // without specifying a collation in our request. - let csCursor = assert.doesNotThrow(() => noCollationColl.aggregate(csPipeline)); - csCursor.close(); - - // Verify that we cannot open a $changeStream if we specify a non-simple collation. - let csError = assert.throws(() => noCollationColl.aggregate(csPipeline, nonSimpleCollation)); - assert.eq(csError.code, 40471); - - // Verify that we cannot open a $changeStream on a collection with a non-simple default - // collation if we omit a collation specification in the request. - csError = assert.throws(() => hasCollationColl.aggregate(csPipeline)); - assert.eq(csError.code, 40471); - - // Verify that we can open a $changeStream on a collection with a non-simple default collation - // if we explicitly request a 'simple' collator. - csCursor = assert.doesNotThrow(() => hasCollationColl.aggregate(csPipeline, simpleCollation)); - csCursor.close(); + db.runCommand({create: similarNameCollection.getName(), collation: {locale: "en_US"}})); + + assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"})); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 2, text: "ABC"})); + + // The existing stream should not see the first insert (to the other collection), but should see + // the second. + cst.assertNextChangesEqual( + {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]}); + cst.assertNextChangesEqual( + {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]}); + + // Test that creating a collection without a collation does not invalidate any change streams + // that were opened before the collection existed. + (function() { + const noCollationCollection = db.change_stream_no_collation; + noCollationCollection.drop(); + + const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], + collection: noCollationCollection + }); + + assert.commandWorked(db.runCommand({create: noCollationCollection.getName()})); + assert.writeOK(noCollationCollection.insert({_id: 0})); + + cst.assertNextChangesEqual( + {cursor: streamCreatedBeforeNoCollationCollection, expectedChanges: [{docId: 0}]}); + }()); + + // Test that creating a collection and explicitly specifying the simple collation does not + // invalidate any change streams that were opened before the collection existed. + (function() { + const simpleCollationCollection = db.change_stream_simple_collation; + simpleCollationCollection.drop(); + + const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], + collection: simpleCollationCollection + }); + + assert.commandWorked(db.runCommand( + {create: simpleCollationCollection.getName(), collation: {locale: "simple"}})); + assert.writeOK(simpleCollationCollection.insert({_id: 0})); + + cst.assertNextChangesEqual( + {cursor: streamCreatedBeforeSimpleCollationCollection, expectedChanges: [{docId: 0}]}); + }()); + + // Test that creating a change stream with a non-default collation, then creating a collection + // with the same collation will not invalidate the change stream. + (function() { + const frenchCollection = db.change_stream_french_collation; + frenchCollection.drop(); + + const frenchChangeStream = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], + aggregateOptions: {collation: {locale: "fr"}}, + collection: frenchCollection + }); + + assert.commandWorked( + db.runCommand({create: frenchCollection.getName(), collation: {locale: "fr"}})); + assert.writeOK(frenchCollection.insert({_id: 0})); + + cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]}); + }()); + + // Test that creating a change stream with a non-default collation, then creating a collection + // with *a different* collation will not invalidate the change stream. + (function() { + const germanCollection = db.change_stream_german_collation; + germanCollection.drop(); + + const englishCaseInsensitiveStream = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {}}, + {$match: {"fullDocument.text": "abc"}}, + {$project: {docId: "$documentKey._id"}} + ], + aggregateOptions: {collation: caseInsensitive}, + collection: germanCollection + }); + + assert.commandWorked( + db.runCommand({create: germanCollection.getName(), collation: {locale: "de"}})); + assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"})); + + cst.assertNextChangesEqual( + {cursor: englishCaseInsensitiveStream, expectedChanges: [{docId: 0}]}); + }()); + + // Test that creating a change stream with a non-default collation against a collection that has + // a non-simple default collation will use the collation specified on the operation. + (function() { + const caseInsensitiveCollection = db.change_stream_case_insensitive; + caseInsensitiveCollection.drop(); + assert.commandWorked(db.runCommand( + {create: caseInsensitiveCollection.getName(), collation: caseInsensitive})); + + const englishCaseSensitiveStream = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {}}, + {$match: {"fullDocument.text": "abc"}}, + {$project: {docId: "$documentKey._id"}} + ], + aggregateOptions: {collation: {locale: "en_US"}}, + collection: caseInsensitiveCollection + }); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"})); + assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"})); + + cst.assertNextChangesEqual( + {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]}); + }()); + })(); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 09b29a94d90..cbaf69fdedf 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -22,7 +22,8 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * Returns the cursor returned by the 'aggregate' command. */ self.startWatchingChanges = function({pipeline, collection, includeToken, aggregateOptions}) { - aggregateOptions = aggregateOptions || {cursor: {batchSize: 1}}; + aggregateOptions = aggregateOptions || {}; + aggregateOptions.cursor = aggregateOptions.cursor || {batchSize: 1}; if (!includeToken) { // Strip the oplog fields we aren't testing. diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index f85102d1e04..976fb378ed7 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -477,9 +477,20 @@ Status runAggregate(OperationContext* opCtx, pipeline = reparsePipeline(pipeline.get(), request, expCtx); } - // This does mongod-specific stuff like creating the input PlanExecutor and adding - // it to the front of the pipeline if needed. - PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get()); + // Prepare a PlanExecutor to provide input into the pipeline, if needed. + if (liteParsedPipeline.hasChangeStream()) { + // If we are using a change stream, the cursor stage should have a simple collation, + // regardless of what the user's collation was. + std::unique_ptr<CollatorInterface> collatorForCursor = nullptr; + auto collatorStash = expCtx->temporarilyChangeCollator(std::move(collatorForCursor)); + PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get()); + } else { + PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get()); + } + // Optimize again, since there may be additional optimizations that can be done after adding + // the initial cursor stage. Note this has to be done outside the above blocks to ensure + // this process uses the correct collation if it does any string comparisons. + pipeline->optimizePipeline(); // Transfer ownership of the Pipeline to the PipelineProxyStage. unownedPipeline = pipeline.get(); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 46dec0b956d..5b850f8e6e3 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -86,7 +86,6 @@ const BSONObj DocumentSourceChangeStream::kSortSpec = namespace { static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; - } // namespace intrusive_ptr<DocumentSourceOplogMatch> DocumentSourceOplogMatch::create( @@ -231,18 +230,28 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { } // namespace -BSONObj DocumentSourceChangeStream::buildMatchFilter(const NamespaceString& nss, - Timestamp startFrom, - bool isResume) { +BSONObj DocumentSourceChangeStream::buildMatchFilter( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp startFrom, bool isResume) { + auto nss = expCtx->ns; auto target = nss.ns(); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. - auto dropDatabase = BSON("o.dropDatabase" << 1); - auto dropCollection = BSON("o.drop" << nss.coll()); - auto renameCollection = BSON("o.renameCollection" << target); + BSONArrayBuilder invalidatingCommands; + invalidatingCommands.append(BSON("o.dropDatabase" << 1)); + invalidatingCommands.append(BSON("o.drop" << nss.coll())); + invalidatingCommands.append(BSON("o.renameCollection" << target)); + if (expCtx->collation.isEmpty()) { + // If the user did not specify a collation, they should be using the collection's default + // collation. So a "create" command which has any collation present would invalidate the + // change stream, since that must mean the stream was created before the collection existed + // and used the simple collation, which is no longer the default. + invalidatingCommands.append( + BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true))); + } // 1.1) Commands that are on target db and one of the above. auto commandsOnTargetDb = - BSON("ns" << nss.getCommandNS().ns() << OR(dropDatabase, dropCollection, renameCollection)); + BSON("$and" << BSON_ARRAY(BSON("ns" << nss.getCommandNS().ns()) + << BSON("$or" << invalidatingCommands.arr()))); // 1.2) Supported commands that have arbitrary db namespaces in "ns" field. auto renameDropTarget = BSON("o.to" << target); // All supported commands that are either (1.1) or (1.2). @@ -283,11 +292,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // A change stream is a tailable + awaitData cursor. expCtx->tailableMode = TailableMode::kTailableAndAwaitData; - uassert(40471, - "Only simple collation is currently allowed when using a $changeStream stage. Please " - "specify a collation of {locale: 'simple'} to open a $changeStream on this collection.", - !expCtx->getCollator()); - boost::optional<Timestamp> startFrom; if (!expCtx->inMongos) { auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); @@ -341,7 +345,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( invariant(expCtx->inMongos || static_cast<bool>(startFrom)); if (startFrom) { stages.push_back(DocumentSourceOplogMatch::create( - buildMatchFilter(expCtx->ns, *startFrom, changeStreamIsResuming), expCtx)); + buildMatchFilter(expCtx, *startFrom, changeStreamIsResuming), expCtx)); } stages.push_back(createTransformationStage(elem.embeddedObject(), expCtx)); @@ -440,6 +444,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D break; } case repl::OpTypeEnum::kCommand: { + // Any command that makes it through our filter is an invalidating command such as a + // drop. operationType = kInvalidateOpType; // Make sure the result doesn't have a document key. documentKey = Value(); @@ -479,7 +485,6 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D // If we're in a sharded environment, we'll need to merge the results by their sort key, so add // that as metadata. if (_expCtx->needsMerge) { - auto change = doc.peek(); doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey)); } diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index e2d1498e482..944a50e66b0 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -148,7 +148,9 @@ public: * Produce the BSON object representing the filter for the $match stage to filter oplog entries * to only those relevant for this $changeStream stage. */ - static BSONObj buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume); + static BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Timestamp startFrom, + bool isResume); /** * Parses a $changeStream stage from 'elem' and produces the $match and transformation @@ -177,6 +179,14 @@ public: const char* getSourceName() const final; + GetNextResult getNext() final { + // We should never execute this stage directly. We expect this stage to be absorbed into the + // cursor feeding the pipeline, and executing this stage may result in the use of the wrong + // collation. The comparisons against the oplog must use the simple collation, regardless of + // the collation on the ExpressionContext. + MONGO_UNREACHABLE; + } + StageConstraints constraints(Pipeline::SplitState pipeState) const final; Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 4fff7302a90..46115d72a0a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -130,25 +130,30 @@ public: DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); + // This match stage is a DocumentSourceOplogMatch, which we explicitly disallow from + // executing as a safety mechanism, since it needs to use the collection-default collation, + // even if the rest of the pipeline is using some other collation. To avoid ever executing + // that stage here, we'll up-convert it from the non-executable DocumentSourceOplogMatch to + // a fully-executable DocumentSourceMatch. This is safe because all of the unit tests will + // use the 'simple' collation. auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get()); ASSERT(match); + auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx()); + auto mock = DocumentSourceMock::create(D(entry.toBSON())); - match->setSource(mock.get()); + executableMatch->setSource(mock.get()); // Check the oplog entry is transformed correctly. auto transform = stages[1].get(); ASSERT(transform); ASSERT_EQ(string(transform->getSourceName()), DSChangeStream::kStageName); - transform->setSource(match); + transform->setSource(executableMatch.get()); auto closeCursor = stages.back().get(); ASSERT(closeCursor); closeCursor->setSource(transform); - // Include the mock stage in the "stages" so it won't get destroyed outside the function - // scope. - stages.insert(stages.begin(), mock); - return stages; + return {mock, executableMatch, transform, closeCursor}; } OplogEntry createCommand(const BSONObj& oField, diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp index cbdf165b504..358775136cd 100644 --- a/src/mongo/db/pipeline/document_source_match.cpp +++ b/src/mongo/db/pipeline/document_source_match.cpp @@ -492,14 +492,14 @@ DocumentSource::GetDepsReturn DocumentSourceMatch::getDependencies(DepsTracker* } DocumentSourceMatch::DocumentSourceMatch(const BSONObj& query, - const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx), + const intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(expCtx), _predicate(query.getOwned()), _isTextQuery(isTextQuery(query)), _dependencies(_isTextQuery ? DepsTracker::MetadataAvailable::kTextScore : DepsTracker::MetadataAvailable::kNoMetadata) { StatusWithMatchExpression status = uassertStatusOK(MatchExpressionParser::parse( - _predicate, pExpCtx, ExtensionsCallbackNoop(), Pipeline::kAllowedMatcherFeatures)); + _predicate, expCtx, ExtensionsCallbackNoop(), Pipeline::kAllowedMatcherFeatures)); _expression = std::move(status.getValue()); getDependencies(&_dependencies); } diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h index eabc4de92ae..4614ee5d50b 100644 --- a/src/mongo/db/pipeline/document_source_match.h +++ b/src/mongo/db/pipeline/document_source_match.h @@ -41,7 +41,7 @@ class DocumentSourceMatch : public DocumentSource { public: virtual ~DocumentSourceMatch() = default; - GetNextResult getNext() final; + GetNextResult getNext() override; boost::intrusive_ptr<DocumentSource> optimize() final; BSONObjSet getOutputSorts() final { return pSource ? pSource->getOutputSorts() @@ -156,7 +156,7 @@ public: protected: DocumentSourceMatch(const BSONObj& query, - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + const boost::intrusive_ptr<ExpressionContext>& expCtx); private: std::unique_ptr<MatchExpression> _expression; diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 7901b89da30..7e460df4ac3 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -80,6 +80,39 @@ void ExpressionContext::checkForInterrupt() { } } +ExpressionContext::CollatorStash::CollatorStash( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<CollatorInterface> newCollator) + : _expCtx(expCtx), + _originalCollation(_expCtx->collation), + _originalCollatorOwned(std::move(_expCtx->_ownedCollator)), + _originalCollatorUnowned(_expCtx->_collator) { + _expCtx->setCollator(std::move(newCollator)); + _expCtx->collation = + _expCtx->getCollator() ? _expCtx->getCollator()->getSpec().toBSON().getOwned() : BSONObj(); +} + +ExpressionContext::CollatorStash::~CollatorStash() { + if (_originalCollatorOwned) { + _expCtx->setCollator(std::move(_originalCollatorOwned)); + } else { + _expCtx->setCollator(_originalCollatorUnowned); + if (!_originalCollatorUnowned && _expCtx->_ownedCollator) { + // If the original collation was 'nullptr', we cannot distinguish whether it was owned + // or not. We always set '_ownedCollator' with the stash, so should reset it to null + // here. + _expCtx->_ownedCollator = nullptr; + } + } + _expCtx->collation = _originalCollation; +} + +std::unique_ptr<ExpressionContext::CollatorStash> ExpressionContext::temporarilyChangeCollator( + std::unique_ptr<CollatorInterface> newCollator) { + // This constructor of CollatorStash is private, so we can't use make_unique(). + return std::unique_ptr<CollatorStash>(new CollatorStash(this, std::move(newCollator))); +} + void ExpressionContext::setCollator(const CollatorInterface* collator) { _collator = collator; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index a5bebd95745..1a4e757918f 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -62,6 +62,38 @@ public: }; /** + * An RAII type that will temporarily change the ExpressionContext's collator. Resets the + * collator to the previous value upon destruction. + */ + class CollatorStash { + public: + /** + * Resets the collator on '_expCtx' to the original collator present at the time this + * CollatorStash was constructed. + */ + ~CollatorStash(); + + private: + /** + * Temporarily changes the collator on 'expCtx' to be 'newCollator'. The collator will be + * set back to the original value when this CollatorStash is deleted. + * + * This constructor is private, all CollatorStashes should be created by calling + * ExpressionContext::temporarilyChangeCollator(). + */ + CollatorStash(const boost::intrusive_ptr<ExpressionContext>& expCtx, + std::unique_ptr<CollatorInterface> newCollator); + + friend class ExpressionContext; + + boost::intrusive_ptr<ExpressionContext> _expCtx; + + BSONObj _originalCollation; + std::unique_ptr<CollatorInterface> _originalCollatorOwned; + const CollatorInterface* _originalCollatorUnowned{nullptr}; + }; + + /** * Constructs an ExpressionContext to be used for Pipeline parsing and evaluation. * 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces. */ @@ -97,6 +129,13 @@ public: } /** + * Temporarily resets the collator to be 'newCollator'. Returns a CollatorStash which will reset + * the collator back to the old value upon destruction. + */ + std::unique_ptr<CollatorStash> temporarilyChangeCollator( + std::unique_ptr<CollatorInterface> newCollator); + + /** * Returns an ExpressionContext that is identical to 'this' that can be used to execute a * separate aggregation pipeline on 'ns' with the optional 'uuid'. */ @@ -172,6 +211,8 @@ protected: setCollator(_ownedCollator.get()); } + friend class CollatorStash; + // Collator used for comparisons. This is owned in the context of a Pipeline. // TODO SERVER-31294: Move ownership of an aggregation's collator elsewhere. std::unique_ptr<CollatorInterface> _ownedCollator; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 932b3e4f3e4..09bd1b1653f 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -257,6 +257,9 @@ public: !bool(css->getMetadata())); PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); + // Optimize again, since there may be additional optimizations that can be done after adding + // the initial cursor stage. + pipeline->optimizePipeline(); return Status::OK(); } @@ -860,11 +863,7 @@ void PipelineD::addCursorSource(Collection* collection, pSource->setProjection(deps.toProjection(), deps.toParsedDeps()); } - - // Add the initial DocumentSourceCursor to the front of the pipeline. Then optimize again in - // case the new stage can be absorbed with the first stages of the pipeline. pipeline->addInitialSource(pSource); - pipeline->optimizePipeline(); } Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) { diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 0da6ebe2f20..467aa9a8d1c 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -60,17 +60,14 @@ struct DepsTracker; class PipelineD { public: /** - * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable - * to be the first source for a pipeline to begin with. This source - * will feed the execution of the pipeline. + * If the first stage in the pipeline does not generate its own output documents, attaches a + * DocumentSourceCursor to the front of the pipeline which will output documents from the + * collection to feed into the pipeline. * - * This method looks for early pipeline stages that can be folded into - * the underlying cursor, and when a cursor can absorb those, they - * are removed from the head of the pipeline. For example, an - * early match can be removed and replaced with a Cursor that will - * do an index scan. - * - * The cursor is added to the front of the pipeline's sources. + * This method looks for early pipeline stages that can be folded into the underlying + * PlanExecutor, and removes those stages from the pipeline when they can be absorbed by the + * PlanExecutor. For example, an early $match can be removed and replaced with a + * DocumentSourceCursor containing a PlanExecutor that will do an index scan. * * Callers must take care to ensure that 'nss' is locked in at least IS-mode. * diff --git a/src/mongo/db/query/canonical_query.cpp b/src/mongo/db/query/canonical_query.cpp index 1cd25b4c370..f95b9aac680 100644 --- a/src/mongo/db/query/canonical_query.cpp +++ b/src/mongo/db/query/canonical_query.cpp @@ -152,6 +152,7 @@ StatusWith<std::unique_ptr<CanonicalQuery>> CanonicalQuery::canonicalize( newExpCtx.reset(new ExpressionContext(opCtx, collator.get())); } else { newExpCtx = expCtx; + invariant(CollatorInterface::collatorsMatch(collator.get(), expCtx->getCollator())); } StatusWithMatchExpression statusWithMatcher = MatchExpressionParser::parse( qr->getFilter(), newExpCtx, extensionsCallback, allowedFeatures); |