summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/metadata_notifications.js31
-rw-r--r--jstests/change_streams/whole_db_resumability.js16
-rw-r--r--jstests/libs/change_stream_util.js7
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp116
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp111
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.cpp97
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.h80
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp4
12 files changed, 343 insertions, 140 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
index ef17b2b4249..a0f074f1357 100644
--- a/jstests/change_streams/metadata_notifications.js
+++ b/jstests/change_streams/metadata_notifications.js
@@ -79,30 +79,21 @@
coll = assertCreateCollection(db, collName);
assert.writeOK(coll.insert({_id: 0}));
- // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to
- // be sure that it doesn't crash the server, but the ability to resume a change stream after an
- // invalidate is a bug, not a feature.
-
// Test resuming the change stream from the collection drop.
- assert.doesNotThrow(function() {
- const resumeTokenDrop = changes[2]._id;
- const resumeCursor =
- coll.watch([], {resumeAfter: resumeTokenDrop, collation: {locale: "simple"}});
- assert.soon(() => resumeCursor.hasNext());
- // Not checking the contents of the document returned, because we do not technically
- // support this behavior.
- resumeCursor.next();
+ cursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: changes[2]._id}}],
+ collection: collName,
+ aggregateOptions: {cursor: {batchSize: 0}, collation: {locale: "simple"}},
});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]});
+
// Test resuming the change stream from the invalidate after the drop.
- assert.doesNotThrow(function() {
- const resumeTokenInvalidate = changes[3]._id;
- const resumeCursor =
- coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}});
- assert.soon(() => resumeCursor.hasNext());
- // Not checking the contents of the document returned, because we do not technically
- // support this behavior.
- resumeCursor.next();
+ cursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: changes[3]._id}}],
+ collection: collName,
+ aggregateOptions: {cursor: {batchSize: 0}, collation: {locale: "simple"}},
});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]});
// Test that renaming a collection being watched generates a "rename" entry followed by an
// "invalidate". This is true if the change stream is on the source or target collection of the
diff --git a/jstests/change_streams/whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js
index 25f03943112..f8f8d3fb64a 100644
--- a/jstests/change_streams/whole_db_resumability.js
+++ b/jstests/change_streams/whole_db_resumability.js
@@ -97,30 +97,22 @@
assert.writeOK(coll.insert({_id: 0}));
// Test resuming from the 'dropDatabase' entry.
- // TODO SERVER-34789: Resuming from the 'dropDatabase' should return a single invalidate
- // notification.
resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: dropDbChanges[1]._id}}],
collection: 1,
aggregateOptions: {cursor: {batchSize: 0}},
});
- let change = cst.getOneChange(resumeCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.fullDocument, {_id: 0}, tojson(change));
- assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}, tojson(change));
+ cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
// Test resuming from the 'invalidate' entry.
- // TODO SERVER-34789: Resuming from an invalidate should error or return an invalidate
- // notification.
resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}],
collection: 1,
aggregateOptions: {cursor: {batchSize: 0}},
});
- change = cst.getOneChange(resumeCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.fullDocument, {_id: 0}, tojson(change));
- assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}, tojson(change));
+ cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
cst.cleanUp();
})();
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 71a56f3ffe8..8c779cbf0f5 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -193,6 +193,13 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
"Expected change's size must match expected number of changes");
}
+ // Convert 'expectedChanges' to an array, even if it contains just a single element.
+ if (expectedChanges !== undefined && !(expectedChanges instanceof Array)) {
+ let arrayVersion = new Array;
+ arrayVersion.push(expectedChanges);
+ expectedChanges = arrayVersion;
+ }
+
// Set the expected number of changes based on the size of the expected change list.
if (expectedNumChanges === undefined) {
assert.neq(expectedChanges, undefined);
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 3d73e724af4..2508af7eda6 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -272,6 +272,7 @@ pipelineeEnv.Library(
'document_source_change_stream.cpp',
'document_source_change_stream_close_cursor.cpp',
'document_source_change_stream_transform.cpp',
+ 'document_source_check_invalidate.cpp',
'document_source_check_resume_token.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 9edf48c4e4d..9dee6a5db97 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
+#include "mongo/db/pipeline/document_source_check_invalidate.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
@@ -318,25 +319,25 @@ void assertResumeAllowed(const intrusive_ptr<ExpressionContext>& expCtx,
expCtx->uuid && tokenData.uuid && expCtx->uuid.get() == tokenData.uuid.get());
}
-/**
- * Parses the resume options in 'spec', optionally populating the resume stage and cluster time to
- * start from. Throws an AssertionException if not running on a replica set or multiple resume
- * options are specified.
- */
-void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceChangeStreamSpec& spec,
- ServerGlobalParams::FeatureCompatibility::Version fcv,
- intrusive_ptr<DocumentSource>* resumeStageOut,
- boost::optional<Timestamp>* startFromOut) {
- if (!expCtx->inMongos) {
- auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
- uassert(40573,
- "The $changeStream stage is only supported on replica sets",
- replCoord &&
- replCoord->getReplicationMode() ==
- repl::ReplicationCoordinator::Mode::modeReplSet);
- *startFromOut = replCoord->getMyLastAppliedOpTime().getTimestamp();
- }
+intrusive_ptr<DocumentSource> createTransformationStage(
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ BSONObj changeStreamSpec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv) {
+ // Mark the transformation stage as independent of any collection if the change stream is
+ // watching all collections in the database.
+ const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS();
+ return intrusive_ptr<DocumentSource>(new DocumentSourceChangeStreamTransform(
+ expCtx, changeStreamSpec, fcv, isIndependentOfAnyCollection));
+}
+
+list<intrusive_ptr<DocumentSource>> buildPipeline(
+ const intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec spec,
+ ServerGlobalParams::FeatureCompatibility::Version fcv,
+ BSONElement elem) {
+ list<intrusive_ptr<DocumentSource>> stages;
+ boost::optional<Timestamp> startFrom;
+ intrusive_ptr<DocumentSource> resumeStage = nullptr;
if (auto resumeAfter = spec.getResumeAfter()) {
ResumeToken token = resumeAfter.get();
@@ -346,13 +347,12 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
// token UUID, and collation.
assertResumeAllowed(expCtx, tokenData);
- *startFromOut = tokenData.clusterTime;
+ startFrom = tokenData.clusterTime;
if (expCtx->needsMerge) {
- *resumeStageOut =
+ resumeStage =
DocumentSourceShardCheckResumability::create(expCtx, tokenData.clusterTime);
} else {
- *resumeStageOut =
- DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token));
+ resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token));
}
}
@@ -361,14 +361,14 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
uassert(40674,
"Only one type of resume option is allowed, but multiple were found.",
- !(*resumeStageOut) || (!resumeAfterClusterTime && !startAtOperationTime));
+ !resumeStage || (!resumeAfterClusterTime && !startAtOperationTime));
if (resumeAfterClusterTime) {
if (fcv >= ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) {
warning() << "The '$_resumeAfterClusterTime' option is deprecated, please use "
"'startAtOperationTime' instead.";
}
- *startFromOut = resumeAfterClusterTime->getTimestamp();
+ startFrom = resumeAfterClusterTime->getTimestamp();
}
// New field name starting in 4.0 is 'startAtOperationTime'.
@@ -381,9 +381,41 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
<< DocumentSourceChangeStreamSpec::kResumeAfterClusterTimeDeprecatedFieldName
<< " in a $changeStream stage.",
!resumeAfterClusterTime);
- *startFromOut = *startAtOperationTime;
- *resumeStageOut = DocumentSourceShardCheckResumability::create(expCtx, **startFromOut);
+ startFrom = *startAtOperationTime;
+ resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
+ }
+
+ // There might not be a starting point if we're on mongos, otherwise we should either have a
+ // 'resumeAfter' starting point, or should start from the latest majority committed operation.
+ auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
+ uassert(40573,
+ "The $changeStream stage is only supported on replica sets",
+ expCtx->inMongos || (replCoord &&
+ replCoord->getReplicationMode() ==
+ repl::ReplicationCoordinator::Mode::modeReplSet));
+ if (!startFrom && !expCtx->inMongos) {
+ startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp();
}
+
+ if (startFrom) {
+ const bool startFromInclusive = (resumeStage != nullptr);
+ stages.push_back(DocumentSourceOplogMatch::create(
+ DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, startFromInclusive),
+ expCtx));
+ }
+
+ stages.push_back(createTransformationStage(expCtx, elem.embeddedObject(), fcv));
+ stages.push_back(DocumentSourceCheckInvalidate::create(expCtx));
+
+ // Resume stage must come after the check invalidate stage to ensure that resuming from an
+ // invalidate or an invalidating command will not ignore the invalidation. Putting the check
+ // invalidate stage first will see the resume token before it is ignored, thereby remembering
+ // that the stream cannot continue.
+ if (resumeStage) {
+ stages.push_back(resumeStage);
+ }
+
+ return stages;
}
} // namespace
@@ -404,10 +436,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// Make sure that it is legal to run this $changeStream before proceeding.
DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec, fcv);
- boost::optional<Timestamp> startFrom;
- intrusive_ptr<DocumentSource> resumeStage = nullptr;
- parseResumeOptions(expCtx, spec, fcv, &resumeStage, &startFrom);
-
auto fullDocOption = spec.getFullDocument();
uassert(40575,
str::stream() << "unrecognized value for the 'fullDocument' option to the "
@@ -419,21 +447,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
const bool shouldLookupPostImage = (fullDocOption == "updateLookup"_sd);
- list<intrusive_ptr<DocumentSource>> stages;
-
- // There might not be a starting point if we're on mongos, otherwise we should either have a
- // 'resumeAfter' starting point, or should start from the latest majority committed operation.
- invariant(expCtx->inMongos || static_cast<bool>(startFrom));
- if (startFrom) {
- const bool startFromInclusive = (resumeStage != nullptr);
- stages.push_back(DocumentSourceOplogMatch::create(
- buildMatchFilter(expCtx, *startFrom, startFromInclusive), expCtx));
- }
-
- stages.push_back(createTransformationStage(expCtx, elem.embeddedObject(), fcv));
- if (resumeStage) {
- stages.push_back(resumeStage);
- }
+ auto stages = buildPipeline(expCtx, spec, fcv, elem);
if (!expCtx->needsMerge) {
// There should only be one close cursor stage. If we're on the shards and producing input
// to be merged, do not add a close cursor stage, since the mongos will already have one.
@@ -507,14 +521,4 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
!expCtx->ns.isSystem());
}
-intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage(
- const intrusive_ptr<ExpressionContext>& expCtx,
- BSONObj changeStreamSpec,
- ServerGlobalParams::FeatureCompatibility::Version fcv) {
- // Mark the transformation stage as independent of any collection if the change stream is
- // watching all collections in the database.
- const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS();
- return intrusive_ptr<DocumentSource>(new DocumentSourceChangeStreamTransform(
- expCtx, changeStreamSpec, fcv, isIndependentOfAnyCollection));
-}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 0d6e84c509a..16175fdcee2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -175,11 +175,6 @@ public:
static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
- static boost::intrusive_ptr<DocumentSource> createTransformationStage(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- BSONObj changeStreamSpec,
- ServerGlobalParams::FeatureCompatibility::Version fcv);
-
/**
* Given a BSON object containing an aggregation command with a $changeStream stage, and a
* resume token, returns a new BSON object with the same command except with the addition of a
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
index 207971dbb76..a0d87d1baa9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
@@ -59,11 +59,6 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated");
}
- if (_queuedInvalidate) {
- _shouldCloseCursor = true;
- return DocumentSource::GetNextResult(std::move(_queuedInvalidate.get()));
- }
-
auto nextInput = pSource->getNext();
if (!nextInput.isAdvanced())
return nextInput;
@@ -80,15 +75,6 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
_shouldCloseCursor = true;
}
- // Check if this is an invalidating command and the next entry should be an "invalidate".
- if (isInvalidatingCommand(pExpCtx, operationType)) {
- _queuedInvalidate = Document{
- {DocumentSourceChangeStream::kIdField, doc[DocumentSourceChangeStream::kIdField]},
- {DocumentSourceChangeStream::kClusterTimeField,
- doc[DocumentSourceChangeStream::kClusterTimeField]},
- {DocumentSourceChangeStream::kOperationTypeField, "invalidate"_sd}};
- }
-
return nextInput;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
index bc60687deae..065b15e7ccc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
@@ -101,7 +101,6 @@ private:
: DocumentSource(expCtx) {}
bool _shouldCloseCursor = false;
- boost::optional<Document> _queuedInvalidate;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 49738f02cfe..b8734cc8d26 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -155,21 +156,29 @@ public:
auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get());
ASSERT(match);
auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx());
-
- auto mock = DocumentSourceMock::create(D(entry));
- executableMatch->setSource(mock.get());
+ // Replace the original match with the executable one.
+ stages[0] = executableMatch;
// Check the oplog entry is transformed correctly.
auto transform = stages[1].get();
ASSERT(transform);
ASSERT_EQ(string(transform->getSourceName()), DSChangeStream::kStageName);
- transform->setSource(executableMatch.get());
- auto closeCursor = stages.back().get();
- ASSERT(closeCursor);
- closeCursor->setSource(transform);
-
- return {mock, executableMatch, transform, closeCursor};
+ // Create mock stage and insert at the front of the stages.
+ auto mock = DocumentSourceMock::create(D(entry));
+ stages.insert(stages.begin(), mock);
+
+ // Wire up the stages by setting the source stage.
+ auto prevStage = stages[0].get();
+ for (auto stageIt = stages.begin() + 1; stageIt != stages.end(); stageIt++) {
+ auto stage = (*stageIt).get();
+ // Do not include the check resume token stage since it will swallow the result.
+ if (dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage))
+ continue;
+ stage->setSource(prevStage);
+ prevStage = stage;
+ }
+ return stages;
}
vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) {
@@ -395,21 +404,6 @@ TEST_F(ChangeStreamStageTestNoSetup, FailsWithNoReplicationCoordinator) {
40573);
}
-TEST_F(ChangeStreamStageTest, StagesGeneratedCorrectly) {
- const auto spec = fromjson("{$changeStream: {}}");
-
- list<intrusive_ptr<DocumentSource>> result =
- DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
- vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
- ASSERT_EQUALS(stages.size(), 3UL);
- ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(stages.front().get()));
- ASSERT_EQUALS(string(stages[0]->getSourceName()), DSChangeStream::kStageName);
- ASSERT_EQUALS(string(stages[1]->getSourceName()), DSChangeStream::kStageName);
- ASSERT_EQUALS(string(stages[2]->getSourceName()), DSChangeStream::kStageName);
-
- // TODO: Check explain result.
-}
-
TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
nss, // namespace
@@ -637,8 +631,8 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(dropColl, expectedDrop, {}, kDefaultSpec, expectedInvalidate);
@@ -659,8 +653,8 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate);
@@ -699,8 +693,8 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate);
@@ -987,7 +981,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage
auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj());
auto result = DSChangeStream::createFromBson(originalSpec.firstElement(), expCtx);
vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
- ASSERT_EQ(allStages.size(), 3UL);
+ ASSERT_EQ(allStages.size(), 4UL);
auto stage = allStages[1];
ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get()));
@@ -1028,8 +1022,8 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
auto next = closeCursor->getNext();
@@ -1180,6 +1174,34 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif
checkTransformation(rename, expectedRename);
}
+TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromDropShouldReturnInvalidate) {
+ auto expCtx = getExpCtx();
+
+ // Need to put the collection in the UUID catalog so the resume token is valid.
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid());
+
+ OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid());
+ const auto resumeTokenDrop = makeResumeToken(kDefaultTs, testUuid());
+
+ Document expectedDrop{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ };
+ Document expectedInvalidate{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ };
+ checkTransformation(dropColl,
+ expectedDrop,
+ {{"_id"}}, // Mock the 'collectDocumentKeyFields' response.
+ BSON("$changeStream" << BSON("resumeAfter" << resumeTokenDrop)),
+ expectedInvalidate);
+}
+
//
// Test class for change stream of a single database.
//
@@ -1422,8 +1444,8 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
};
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(dropDB, expectedDropDatabase, {}, kDefaultSpec, expectedInvalidate);
@@ -1605,5 +1627,34 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken
BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
}
+TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabaseShouldReturnInvalidate) {
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry.
+ auto resumeToken = makeResumeToken(kDefaultTs);
+ OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, false);
+
+ Document expectedDropDatabase{
+ {DSChangeStream::kIdField, resumeToken},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDropDatabaseOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
+ };
+ Document expectedInvalidate{
+ {DSChangeStream::kIdField, resumeToken},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ };
+
+ checkTransformation(dropDB,
+ expectedDropDatabase,
+ {{"_id"}}, // Mock the 'collectDocumentKeyFields' response.
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)),
+ expectedInvalidate);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
new file mode 100644
index 00000000000..edfb16d7bae
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_check_invalidate.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+using DSCS = DocumentSourceChangeStream;
+
+namespace {
+
+// Returns true if the given 'operationType' should invalidate the change stream based on the
+// namespace in 'pExpCtx'.
+bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ StringData operationType) {
+ if (pExpCtx->isSingleNamespaceAggregation()) {
+ return operationType == DSCS::kDropCollectionOpType ||
+ operationType == DSCS::kRenameCollectionOpType ||
+ operationType == DSCS::kDropDatabaseOpType;
+ } else if (!pExpCtx->isClusterAggregation()) {
+ return operationType == DSCS::kDropDatabaseOpType;
+ } else {
+ return false;
+ }
+};
+
+} // namespace
+
+DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ invariant(!pExpCtx->inMongos);
+
+ if (_queuedInvalidate) {
+ const auto res = DocumentSource::GetNextResult(std::move(_queuedInvalidate.get()));
+ _queuedInvalidate.reset();
+ return res;
+ }
+
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isAdvanced())
+ return nextInput;
+
+ auto doc = nextInput.getDocument();
+ const auto& kOperationTypeField = DSCS::kOperationTypeField;
+ DSCS::checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String);
+ auto operationType = doc[kOperationTypeField].getString();
+
+ // If this command should invalidate the stream, generate an invalidate entry and queue it up
+ // to be returned after the notification of this command.
+ if (isInvalidatingCommand(pExpCtx, operationType)) {
+ MutableDocument result(Document{{DSCS::kIdField, doc[DSCS::kIdField]},
+ {DSCS::kOperationTypeField, DSCS::kInvalidateOpType},
+ {DSCS::kClusterTimeField, doc[DSCS::kClusterTimeField]}});
+
+ // If we're in a sharded environment, we'll need to merge the results by their sort key, so
+ // add that as metadata.
+ result.copyMetaDataFrom(doc);
+
+ _queuedInvalidate = result.freeze();
+ }
+
+ return nextInput;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h
new file mode 100644
index 00000000000..b689ea6f510
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.h
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * This stage is used internally for change stream notifications to artifically generate an
+ * "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop
+ * for a single-collection change stream). It is not intended to be created by the user.
+ */
+class DocumentSourceCheckInvalidate final : public DocumentSource {
+public:
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final {
+ // This is used in error reporting.
+ return "$_checkInvalidate";
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
+ // would result in it being created twice.
+ return Value();
+ }
+
+ static boost::intrusive_ptr<DocumentSourceCheckInvalidate> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceCheckInvalidate(expCtx);
+ }
+
+private:
+ /**
+ * Use the create static method to create a DocumentSourceCheckInvalidate.
+ */
+ DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(expCtx) {}
+
+ boost::optional<Document> _queuedInvalidate;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index c53262cfc47..acb80b6b00c 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1536,7 +1536,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
auto spec = BSON("$changeStream" << BSON("fullDocument"
<< "updateLookup"));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
- ASSERT_EQ(stages.size(), 4UL);
+ ASSERT_EQ(stages.size(), 5UL);
// Make sure the change lookup is at the end.
ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
@@ -1561,7 +1561,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage
auto spec = BSON("$changeStream" << BSON("fullDocument"
<< "updateLookup"));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
- ASSERT_EQ(stages.size(), 4UL);
+ ASSERT_EQ(stages.size(), 5UL);
// Make sure the change lookup is at the end.
ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));