summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/change_stream_collation.js210
-rw-r--r--jstests/libs/change_stream_util.js3
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp35
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_match.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_match.h4
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp33
-rw-r--r--src/mongo/db/pipeline/expression_context.h41
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp7
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h17
-rw-r--r--src/mongo/db/query/canonical_query.cpp1
13 files changed, 323 insertions, 80 deletions
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js
index 874875bdaf6..a3ef58a14de 100644
--- a/jstests/change_streams/change_stream_collation.js
+++ b/jstests/change_streams/change_stream_collation.js
@@ -1,44 +1,184 @@
/**
- * Test that a $changeStream pipeline adopts either the user-specified collation, or the default of
- * the target collection if no specific collation is requested.
- * TODO SERVER-31443: Update these tests to verify full collation support with $changeStream.
+ * Tests that a change stream can use a user-specified, or collection-default collation.
+ *
+ * This test assumes that it will be able to drop and then re-create a collection with non-default
+ * options.
+ * @tags: [assumes_no_implicit_collection_creation_after_drop]
*/
(function() {
"use strict";
- const noCollationColl = db.change_stream_no_collation;
- const hasCollationColl = db.change_stream_collation;
+ load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'.
- hasCollationColl.drop();
- noCollationColl.drop();
+ let cst = new ChangeStreamTest(db);
+ const caseInsensitive = {locale: "en_US", strength: 2};
+ const caseInsensitiveCollection = db.change_stream_case_insensitive;
+ caseInsensitiveCollection.drop();
+
+ // Test that you can open a change stream before the collection exists, and it will use the
+ // simple collation.
+ const simpleCollationStream = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: caseInsensitiveCollection});
+
+ // Create the collection with a non-default collation - this should invalidate the stream we
+ // opened before it existed.
+ assert.commandWorked(
+ db.runCommand({create: caseInsensitiveCollection.getName(), collation: caseInsensitive}));
+ cst.assertNextChangesEqual({
+ cursor: simpleCollationStream,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ const implicitCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ // Be careful not to use _id in this projection, as startWatchingChanges() will exclude
+ // it by default, assuming it is the resume token.
+ {$project: {docId: "$documentKey._id"}}
+ ],
+ collection: caseInsensitiveCollection
+ });
+ const explicitCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ],
+ collection: caseInsensitiveCollection,
+ aggregateOptions: {collation: caseInsensitive}
+ });
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
+
+ cst.assertNextChangesEqual(
+ {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
+ cst.assertNextChangesEqual(
+ {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
+
+ // Test that the collation does not apply to the scan over the oplog.
+ const similarNameCollection = db.cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe;
+ similarNameCollection.drop();
assert.commandWorked(
- db.runCommand({create: hasCollationColl.getName(), collation: {locale: "en_US"}}));
- assert.commandWorked(db.runCommand({create: noCollationColl.getName()}));
-
- assert.writeOK(hasCollationColl.insert({_id: 1}));
- assert.writeOK(noCollationColl.insert({_id: 1}));
-
- const csPipeline = [{$changeStream: {}}];
- const simpleCollation = {collation: {locale: "simple"}};
- const nonSimpleCollation = {collation: {locale: "en_US"}};
-
- // Verify that we can open a $changeStream on a collection whose default collation is 'simple'
- // without specifying a collation in our request.
- let csCursor = assert.doesNotThrow(() => noCollationColl.aggregate(csPipeline));
- csCursor.close();
-
- // Verify that we cannot open a $changeStream if we specify a non-simple collation.
- let csError = assert.throws(() => noCollationColl.aggregate(csPipeline, nonSimpleCollation));
- assert.eq(csError.code, 40471);
-
- // Verify that we cannot open a $changeStream on a collection with a non-simple default
- // collation if we omit a collation specification in the request.
- csError = assert.throws(() => hasCollationColl.aggregate(csPipeline));
- assert.eq(csError.code, 40471);
-
- // Verify that we can open a $changeStream on a collection with a non-simple default collation
- // if we explicitly request a 'simple' collator.
- csCursor = assert.doesNotThrow(() => hasCollationColl.aggregate(csPipeline, simpleCollation));
- csCursor.close();
+ db.runCommand({create: similarNameCollection.getName(), collation: {locale: "en_US"}}));
+
+ assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"}));
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 2, text: "ABC"}));
+
+ // The existing stream should not see the first insert (to the other collection), but should see
+ // the second.
+ cst.assertNextChangesEqual(
+ {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]});
+ cst.assertNextChangesEqual(
+ {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]});
+
+ // Test that creating a collection without a collation does not invalidate any change streams
+ // that were opened before the collection existed.
+ (function() {
+ const noCollationCollection = db.change_stream_no_collation;
+ noCollationCollection.drop();
+
+ const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ collection: noCollationCollection
+ });
+
+ assert.commandWorked(db.runCommand({create: noCollationCollection.getName()}));
+ assert.writeOK(noCollationCollection.insert({_id: 0}));
+
+ cst.assertNextChangesEqual(
+ {cursor: streamCreatedBeforeNoCollationCollection, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a collection and explicitly specifying the simple collation does not
+ // invalidate any change streams that were opened before the collection existed.
+ (function() {
+ const simpleCollationCollection = db.change_stream_simple_collation;
+ simpleCollationCollection.drop();
+
+ const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ collection: simpleCollationCollection
+ });
+
+ assert.commandWorked(db.runCommand(
+ {create: simpleCollationCollection.getName(), collation: {locale: "simple"}}));
+ assert.writeOK(simpleCollationCollection.insert({_id: 0}));
+
+ cst.assertNextChangesEqual(
+ {cursor: streamCreatedBeforeSimpleCollationCollection, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation, then creating a collection
+ // with the same collation will not invalidate the change stream.
+ (function() {
+ const frenchCollection = db.change_stream_french_collation;
+ frenchCollection.drop();
+
+ const frenchChangeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ aggregateOptions: {collation: {locale: "fr"}},
+ collection: frenchCollection
+ });
+
+ assert.commandWorked(
+ db.runCommand({create: frenchCollection.getName(), collation: {locale: "fr"}}));
+ assert.writeOK(frenchCollection.insert({_id: 0}));
+
+ cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation, then creating a collection
+ // with *a different* collation will not invalidate the change stream.
+ (function() {
+ const germanCollection = db.change_stream_german_collation;
+ germanCollection.drop();
+
+ const englishCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ],
+ aggregateOptions: {collation: caseInsensitive},
+ collection: germanCollection
+ });
+
+ assert.commandWorked(
+ db.runCommand({create: germanCollection.getName(), collation: {locale: "de"}}));
+ assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"}));
+
+ cst.assertNextChangesEqual(
+ {cursor: englishCaseInsensitiveStream, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation against a collection that has
+ // a non-simple default collation will use the collation specified on the operation.
+ (function() {
+ const caseInsensitiveCollection = db.change_stream_case_insensitive;
+ caseInsensitiveCollection.drop();
+ assert.commandWorked(db.runCommand(
+ {create: caseInsensitiveCollection.getName(), collation: caseInsensitive}));
+
+ const englishCaseSensitiveStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ],
+ aggregateOptions: {collation: {locale: "en_US"}},
+ collection: caseInsensitiveCollection
+ });
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
+
+ cst.assertNextChangesEqual(
+ {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]});
+ }());
+
})();
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 09b29a94d90..cbaf69fdedf 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -22,7 +22,8 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
* Returns the cursor returned by the 'aggregate' command.
*/
self.startWatchingChanges = function({pipeline, collection, includeToken, aggregateOptions}) {
- aggregateOptions = aggregateOptions || {cursor: {batchSize: 1}};
+ aggregateOptions = aggregateOptions || {};
+ aggregateOptions.cursor = aggregateOptions.cursor || {batchSize: 1};
if (!includeToken) {
// Strip the oplog fields we aren't testing.
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index f85102d1e04..976fb378ed7 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -477,9 +477,20 @@ Status runAggregate(OperationContext* opCtx,
pipeline = reparsePipeline(pipeline.get(), request, expCtx);
}
- // This does mongod-specific stuff like creating the input PlanExecutor and adding
- // it to the front of the pipeline if needed.
- PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get());
+ // Prepare a PlanExecutor to provide input into the pipeline, if needed.
+ if (liteParsedPipeline.hasChangeStream()) {
+ // If we are using a change stream, the cursor stage should have a simple collation,
+ // regardless of what the user's collation was.
+ std::unique_ptr<CollatorInterface> collatorForCursor = nullptr;
+ auto collatorStash = expCtx->temporarilyChangeCollator(std::move(collatorForCursor));
+ PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get());
+ } else {
+ PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get());
+ }
+ // Optimize again, since there may be additional optimizations that can be done after adding
+ // the initial cursor stage. Note this has to be done outside the above blocks to ensure
+ // this process uses the correct collation if it does any string comparisons.
+ pipeline->optimizePipeline();
// Transfer ownership of the Pipeline to the PipelineProxyStage.
unownedPipeline = pipeline.get();
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 46dec0b956d..5b850f8e6e3 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -86,7 +86,6 @@ const BSONObj DocumentSourceChangeStream::kSortSpec =
namespace {
static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd;
-
} // namespace
intrusive_ptr<DocumentSourceOplogMatch> DocumentSourceOplogMatch::create(
@@ -231,18 +230,28 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
} // namespace
-BSONObj DocumentSourceChangeStream::buildMatchFilter(const NamespaceString& nss,
- Timestamp startFrom,
- bool isResume) {
+BSONObj DocumentSourceChangeStream::buildMatchFilter(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Timestamp startFrom, bool isResume) {
+ auto nss = expCtx->ns;
auto target = nss.ns();
// 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field.
- auto dropDatabase = BSON("o.dropDatabase" << 1);
- auto dropCollection = BSON("o.drop" << nss.coll());
- auto renameCollection = BSON("o.renameCollection" << target);
+ BSONArrayBuilder invalidatingCommands;
+ invalidatingCommands.append(BSON("o.dropDatabase" << 1));
+ invalidatingCommands.append(BSON("o.drop" << nss.coll()));
+ invalidatingCommands.append(BSON("o.renameCollection" << target));
+ if (expCtx->collation.isEmpty()) {
+ // If the user did not specify a collation, they should be using the collection's default
+ // collation. So a "create" command which has any collation present would invalidate the
+ // change stream, since that must mean the stream was created before the collection existed
+ // and used the simple collation, which is no longer the default.
+ invalidatingCommands.append(
+ BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true)));
+ }
// 1.1) Commands that are on target db and one of the above.
auto commandsOnTargetDb =
- BSON("ns" << nss.getCommandNS().ns() << OR(dropDatabase, dropCollection, renameCollection));
+ BSON("$and" << BSON_ARRAY(BSON("ns" << nss.getCommandNS().ns())
+ << BSON("$or" << invalidatingCommands.arr())));
// 1.2) Supported commands that have arbitrary db namespaces in "ns" field.
auto renameDropTarget = BSON("o.to" << target);
// All supported commands that are either (1.1) or (1.2).
@@ -283,11 +292,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// A change stream is a tailable + awaitData cursor.
expCtx->tailableMode = TailableMode::kTailableAndAwaitData;
- uassert(40471,
- "Only simple collation is currently allowed when using a $changeStream stage. Please "
- "specify a collation of {locale: 'simple'} to open a $changeStream on this collection.",
- !expCtx->getCollator());
-
boost::optional<Timestamp> startFrom;
if (!expCtx->inMongos) {
auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
@@ -341,7 +345,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
invariant(expCtx->inMongos || static_cast<bool>(startFrom));
if (startFrom) {
stages.push_back(DocumentSourceOplogMatch::create(
- buildMatchFilter(expCtx->ns, *startFrom, changeStreamIsResuming), expCtx));
+ buildMatchFilter(expCtx, *startFrom, changeStreamIsResuming), expCtx));
}
stages.push_back(createTransformationStage(elem.embeddedObject(), expCtx));
@@ -440,6 +444,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
break;
}
case repl::OpTypeEnum::kCommand: {
+ // Any command that makes it through our filter is an invalidating command such as a
+ // drop.
operationType = kInvalidateOpType;
// Make sure the result doesn't have a document key.
documentKey = Value();
@@ -479,7 +485,6 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
// If we're in a sharded environment, we'll need to merge the results by their sort key, so add
// that as metadata.
if (_expCtx->needsMerge) {
- auto change = doc.peek();
doc.setSortKeyMetaField(BSON("" << ts << "" << uuid << "" << documentKey));
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index e2d1498e482..944a50e66b0 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -148,7 +148,9 @@ public:
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
* to only those relevant for this $changeStream stage.
*/
- static BSONObj buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume);
+ static BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Timestamp startFrom,
+ bool isResume);
/**
* Parses a $changeStream stage from 'elem' and produces the $match and transformation
@@ -177,6 +179,14 @@ public:
const char* getSourceName() const final;
+ GetNextResult getNext() final {
+ // We should never execute this stage directly. We expect this stage to be absorbed into the
+ // cursor feeding the pipeline, and executing this stage may result in the use of the wrong
+ // collation. The comparisons against the oplog must use the simple collation, regardless of
+ // the collation on the ExpressionContext.
+ MONGO_UNREACHABLE;
+ }
+
StageConstraints constraints(Pipeline::SplitState pipeState) const final;
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 4fff7302a90..46115d72a0a 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -130,25 +130,30 @@ public:
DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
+ // This match stage is a DocumentSourceOplogMatch, which we explicitly disallow from
+ // executing as a safety mechanism, since it needs to use the collection-default collation,
+ // even if the rest of the pipeline is using some other collation. To avoid ever executing
+ // that stage here, we'll up-convert it from the non-executable DocumentSourceOplogMatch to
+ // a fully-executable DocumentSourceMatch. This is safe because all of the unit tests will
+ // use the 'simple' collation.
auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get());
ASSERT(match);
+ auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx());
+
auto mock = DocumentSourceMock::create(D(entry.toBSON()));
- match->setSource(mock.get());
+ executableMatch->setSource(mock.get());
// Check the oplog entry is transformed correctly.
auto transform = stages[1].get();
ASSERT(transform);
ASSERT_EQ(string(transform->getSourceName()), DSChangeStream::kStageName);
- transform->setSource(match);
+ transform->setSource(executableMatch.get());
auto closeCursor = stages.back().get();
ASSERT(closeCursor);
closeCursor->setSource(transform);
- // Include the mock stage in the "stages" so it won't get destroyed outside the function
- // scope.
- stages.insert(stages.begin(), mock);
- return stages;
+ return {mock, executableMatch, transform, closeCursor};
}
OplogEntry createCommand(const BSONObj& oField,
diff --git a/src/mongo/db/pipeline/document_source_match.cpp b/src/mongo/db/pipeline/document_source_match.cpp
index cbdf165b504..358775136cd 100644
--- a/src/mongo/db/pipeline/document_source_match.cpp
+++ b/src/mongo/db/pipeline/document_source_match.cpp
@@ -492,14 +492,14 @@ DocumentSource::GetDepsReturn DocumentSourceMatch::getDependencies(DepsTracker*
}
DocumentSourceMatch::DocumentSourceMatch(const BSONObj& query,
- const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx),
+ const intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx),
_predicate(query.getOwned()),
_isTextQuery(isTextQuery(query)),
_dependencies(_isTextQuery ? DepsTracker::MetadataAvailable::kTextScore
: DepsTracker::MetadataAvailable::kNoMetadata) {
StatusWithMatchExpression status = uassertStatusOK(MatchExpressionParser::parse(
- _predicate, pExpCtx, ExtensionsCallbackNoop(), Pipeline::kAllowedMatcherFeatures));
+ _predicate, expCtx, ExtensionsCallbackNoop(), Pipeline::kAllowedMatcherFeatures));
_expression = std::move(status.getValue());
getDependencies(&_dependencies);
}
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index eabc4de92ae..4614ee5d50b 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -41,7 +41,7 @@ class DocumentSourceMatch : public DocumentSource {
public:
virtual ~DocumentSourceMatch() = default;
- GetNextResult getNext() final;
+ GetNextResult getNext() override;
boost::intrusive_ptr<DocumentSource> optimize() final;
BSONObjSet getOutputSorts() final {
return pSource ? pSource->getOutputSorts()
@@ -156,7 +156,7 @@ public:
protected:
DocumentSourceMatch(const BSONObj& query,
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx);
private:
std::unique_ptr<MatchExpression> _expression;
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 7901b89da30..7e460df4ac3 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -80,6 +80,39 @@ void ExpressionContext::checkForInterrupt() {
}
}
+ExpressionContext::CollatorStash::CollatorStash(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<CollatorInterface> newCollator)
+ : _expCtx(expCtx),
+ _originalCollation(_expCtx->collation),
+ _originalCollatorOwned(std::move(_expCtx->_ownedCollator)),
+ _originalCollatorUnowned(_expCtx->_collator) {
+ _expCtx->setCollator(std::move(newCollator));
+ _expCtx->collation =
+ _expCtx->getCollator() ? _expCtx->getCollator()->getSpec().toBSON().getOwned() : BSONObj();
+}
+
+ExpressionContext::CollatorStash::~CollatorStash() {
+ if (_originalCollatorOwned) {
+ _expCtx->setCollator(std::move(_originalCollatorOwned));
+ } else {
+ _expCtx->setCollator(_originalCollatorUnowned);
+ if (!_originalCollatorUnowned && _expCtx->_ownedCollator) {
+ // If the original collation was 'nullptr', we cannot distinguish whether it was owned
+ // or not. We always set '_ownedCollator' with the stash, so should reset it to null
+ // here.
+ _expCtx->_ownedCollator = nullptr;
+ }
+ }
+ _expCtx->collation = _originalCollation;
+}
+
+std::unique_ptr<ExpressionContext::CollatorStash> ExpressionContext::temporarilyChangeCollator(
+ std::unique_ptr<CollatorInterface> newCollator) {
+ // This constructor of CollatorStash is private, so we can't use make_unique().
+ return std::unique_ptr<CollatorStash>(new CollatorStash(this, std::move(newCollator)));
+}
+
void ExpressionContext::setCollator(const CollatorInterface* collator) {
_collator = collator;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index a5bebd95745..1a4e757918f 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -62,6 +62,38 @@ public:
};
/**
+ * An RAII type that will temporarily change the ExpressionContext's collator. Resets the
+ * collator to the previous value upon destruction.
+ */
+ class CollatorStash {
+ public:
+ /**
+ * Resets the collator on '_expCtx' to the original collator present at the time this
+ * CollatorStash was constructed.
+ */
+ ~CollatorStash();
+
+ private:
+ /**
+ * Temporarily changes the collator on 'expCtx' to be 'newCollator'. The collator will be
+ * set back to the original value when this CollatorStash is deleted.
+ *
+ * This constructor is private, all CollatorStashes should be created by calling
+ * ExpressionContext::temporarilyChangeCollator().
+ */
+ CollatorStash(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ std::unique_ptr<CollatorInterface> newCollator);
+
+ friend class ExpressionContext;
+
+ boost::intrusive_ptr<ExpressionContext> _expCtx;
+
+ BSONObj _originalCollation;
+ std::unique_ptr<CollatorInterface> _originalCollatorOwned;
+ const CollatorInterface* _originalCollatorUnowned{nullptr};
+ };
+
+ /**
* Constructs an ExpressionContext to be used for Pipeline parsing and evaluation.
* 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces.
*/
@@ -97,6 +129,13 @@ public:
}
/**
+ * Temporarily resets the collator to be 'newCollator'. Returns a CollatorStash which will reset
+ * the collator back to the old value upon destruction.
+ */
+ std::unique_ptr<CollatorStash> temporarilyChangeCollator(
+ std::unique_ptr<CollatorInterface> newCollator);
+
+ /**
* Returns an ExpressionContext that is identical to 'this' that can be used to execute a
* separate aggregation pipeline on 'ns' with the optional 'uuid'.
*/
@@ -172,6 +211,8 @@ protected:
setCollator(_ownedCollator.get());
}
+ friend class CollatorStash;
+
// Collator used for comparisons. This is owned in the context of a Pipeline.
// TODO SERVER-31294: Move ownership of an aggregation's collator elsewhere.
std::unique_ptr<CollatorInterface> _ownedCollator;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 932b3e4f3e4..09bd1b1653f 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -257,6 +257,9 @@ public:
!bool(css->getMetadata()));
PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
+ // Optimize again, since there may be additional optimizations that can be done after adding
+ // the initial cursor stage.
+ pipeline->optimizePipeline();
return Status::OK();
}
@@ -860,11 +863,7 @@ void PipelineD::addCursorSource(Collection* collection,
pSource->setProjection(deps.toProjection(), deps.toParsedDeps());
}
-
- // Add the initial DocumentSourceCursor to the front of the pipeline. Then optimize again in
- // case the new stage can be absorbed with the first stages of the pipeline.
pipeline->addInitialSource(pSource);
- pipeline->optimizePipeline();
}
Timestamp PipelineD::getLatestOplogTimestamp(const Pipeline* pipeline) {
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 0da6ebe2f20..467aa9a8d1c 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -60,17 +60,14 @@ struct DepsTracker;
class PipelineD {
public:
/**
- * Create a Cursor wrapped in a DocumentSourceCursor, which is suitable
- * to be the first source for a pipeline to begin with. This source
- * will feed the execution of the pipeline.
+ * If the first stage in the pipeline does not generate its own output documents, attaches a
+ * DocumentSourceCursor to the front of the pipeline which will output documents from the
+ * collection to feed into the pipeline.
*
- * This method looks for early pipeline stages that can be folded into
- * the underlying cursor, and when a cursor can absorb those, they
- * are removed from the head of the pipeline. For example, an
- * early match can be removed and replaced with a Cursor that will
- * do an index scan.
- *
- * The cursor is added to the front of the pipeline's sources.
+ * This method looks for early pipeline stages that can be folded into the underlying
+ * PlanExecutor, and removes those stages from the pipeline when they can be absorbed by the
+ * PlanExecutor. For example, an early $match can be removed and replaced with a
+ * DocumentSourceCursor containing a PlanExecutor that will do an index scan.
*
* Callers must take care to ensure that 'nss' is locked in at least IS-mode.
*
diff --git a/src/mongo/db/query/canonical_query.cpp b/src/mongo/db/query/canonical_query.cpp
index 1cd25b4c370..f95b9aac680 100644
--- a/src/mongo/db/query/canonical_query.cpp
+++ b/src/mongo/db/query/canonical_query.cpp
@@ -152,6 +152,7 @@ StatusWith<std::unique_ptr<CanonicalQuery>> CanonicalQuery::canonicalize(
newExpCtx.reset(new ExpressionContext(opCtx, collator.get()));
} else {
newExpCtx = expCtx;
+ invariant(CollatorInterface::collatorsMatch(collator.get(), expCtx->getCollator()));
}
StatusWithMatchExpression statusWithMatcher = MatchExpressionParser::parse(
qr->getFilter(), newExpCtx, extensionsCallback, allowedFeatures);