summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp102
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp14
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp190
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.cpp113
-rw-r--r--src/mongo/db/pipeline/document_source_check_invalidate.h82
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp4
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp13
-rw-r--r--src/mongo/db/pipeline/resume_token.h12
11 files changed, 434 insertions, 104 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 120996f96ff..fbdc846e838 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -271,6 +271,7 @@ pipelineeEnv.Library(
'document_source_change_stream.cpp',
'document_source_change_stream_close_cursor.cpp',
'document_source_change_stream_transform.cpp',
+ 'document_source_check_invalidate.cpp',
'document_source_check_resume_token.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b123820e981..6aaec438148 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_change_stream_close_cursor.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
+#include "mongo/db/pipeline/document_source_check_invalidate.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
@@ -318,24 +319,13 @@ void assertResumeAllowed(const intrusive_ptr<ExpressionContext>& expCtx,
expCtx->uuid && tokenData.uuid && expCtx->uuid.get() == tokenData.uuid.get());
}
-/**
- * Parses the resume options in 'spec', optionally populating the resume stage and cluster time to
- * start from. Throws an AssertionException if not running on a replica set or multiple resume
- * options are specified.
- */
-void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
- const DocumentSourceChangeStreamSpec& spec,
- intrusive_ptr<DocumentSource>* resumeStageOut,
- boost::optional<Timestamp>* startFromOut) {
- if (!expCtx->inMongos) {
- auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
- uassert(40573,
- "The $changeStream stage is only supported on replica sets",
- replCoord &&
- replCoord->getReplicationMode() ==
- repl::ReplicationCoordinator::Mode::modeReplSet);
- *startFromOut = replCoord->getMyLastAppliedOpTime().getTimestamp();
- }
+list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec spec,
+ BSONElement elem) {
+ list<intrusive_ptr<DocumentSource>> stages;
+ boost::optional<Timestamp> startFrom;
+ intrusive_ptr<DocumentSource> resumeStage = nullptr;
+ bool ignoreFirstInvalidate = false;
auto resumeAfter = spec.getResumeAfter();
auto startAfter = spec.getStartAfter();
@@ -347,29 +337,65 @@ void parseResumeOptions(const intrusive_ptr<ExpressionContext>& expCtx,
ResumeToken token = resumeAfter ? resumeAfter.get() : startAfter.get();
ResumeTokenData tokenData = token.getData();
+ // If resuming from an "invalidate" using "startAfter", set this bit to indicate to the
+ // DocumentSourceCheckInvalidate stage that a second invalidate should not be generated.
+ ignoreFirstInvalidate = startAfter && tokenData.fromInvalidate;
+
+ uassert(ErrorCodes::InvalidResumeToken,
+ "Attempting to resume a change stream using 'resumeAfter' is not allowed from an "
+ "invalidate notification.",
+ !resumeAfter || !tokenData.fromInvalidate);
// Verify that the requested resume attempt is possible based on the stream type, resume
// token UUID, and collation.
assertResumeAllowed(expCtx, tokenData);
- *startFromOut = tokenData.clusterTime;
+ startFrom = tokenData.clusterTime;
if (expCtx->needsMerge) {
- *resumeStageOut =
+ resumeStage =
DocumentSourceShardCheckResumability::create(expCtx, tokenData.clusterTime);
} else {
- *resumeStageOut =
- DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token));
+ resumeStage = DocumentSourceEnsureResumeTokenPresent::create(expCtx, std::move(token));
}
}
- auto startAtOperationTime = spec.getStartAtOperationTime();
-
- if (startAtOperationTime) {
+ if (auto startAtOperationTime = spec.getStartAtOperationTime()) {
uassert(40674,
"Only one type of resume option is allowed, but multiple were found.",
- !*resumeStageOut);
- *startFromOut = *startAtOperationTime;
- *resumeStageOut = DocumentSourceShardCheckResumability::create(expCtx, **startFromOut);
+ !resumeStage);
+ startFrom = *startAtOperationTime;
+ resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
}
+
+ // There might not be a starting point if we're on mongos, otherwise we should either have a
+ // 'resumeAfter' starting point, or should start from the latest majority committed operation.
+ auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
+ uassert(40573,
+ "The $changeStream stage is only supported on replica sets",
+ expCtx->inMongos || (replCoord &&
+ replCoord->getReplicationMode() ==
+ repl::ReplicationCoordinator::Mode::modeReplSet));
+ if (!startFrom && !expCtx->inMongos) {
+ startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp();
+ }
+
+ if (startFrom) {
+ const bool startFromInclusive = (resumeStage != nullptr);
+ stages.push_back(DocumentSourceOplogMatch::create(
+ DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, startFromInclusive),
+ expCtx));
+ }
+
+ stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, elem.embeddedObject()));
+ stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, ignoreFirstInvalidate));
+
+ // The resume stage must come after the check invalidate stage to allow the check invalidate
+ // stage to determine whether the oplog entry matching the resume token should be followed by an
+ // "invalidate" entry.
+ if (resumeStage) {
+ stages.push_back(resumeStage);
+ }
+
+ return stages;
}
} // namespace
@@ -389,10 +415,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// Make sure that it is legal to run this $changeStream before proceeding.
DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec);
- boost::optional<Timestamp> startFrom;
- intrusive_ptr<DocumentSource> resumeStage = nullptr;
- parseResumeOptions(expCtx, spec, &resumeStage, &startFrom);
-
auto fullDocOption = spec.getFullDocument();
uassert(40575,
str::stream() << "unrecognized value for the 'fullDocument' option to the "
@@ -404,21 +426,7 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
const bool shouldLookupPostImage = (fullDocOption == "updateLookup"_sd);
- list<intrusive_ptr<DocumentSource>> stages;
-
- // There might not be a starting point if we're on mongos, otherwise we should either have a
- // 'resumeAfter' starting point, or should start from the latest majority committed operation.
- invariant(expCtx->inMongos || static_cast<bool>(startFrom));
- if (startFrom) {
- const bool startFromInclusive = (resumeStage != nullptr);
- stages.push_back(DocumentSourceOplogMatch::create(
- buildMatchFilter(expCtx, *startFrom, startFromInclusive), expCtx));
- }
-
- stages.push_back(DocumentSourceChangeStreamTransform::create(expCtx, elem.embeddedObject()));
- if (resumeStage) {
- stages.push_back(resumeStage);
- }
+ auto stages = buildPipeline(expCtx, spec, elem);
if (!expCtx->needsMerge) {
// There should only be one close cursor stage. If we're on the shards and producing input
// to be merged, do not add a close cursor stage, since the mongos will already have one.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
index 207971dbb76..a0d87d1baa9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.cpp
@@ -59,11 +59,6 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated");
}
- if (_queuedInvalidate) {
- _shouldCloseCursor = true;
- return DocumentSource::GetNextResult(std::move(_queuedInvalidate.get()));
- }
-
auto nextInput = pSource->getNext();
if (!nextInput.isAdvanced())
return nextInput;
@@ -80,15 +75,6 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
_shouldCloseCursor = true;
}
- // Check if this is an invalidating command and the next entry should be an "invalidate".
- if (isInvalidatingCommand(pExpCtx, operationType)) {
- _queuedInvalidate = Document{
- {DocumentSourceChangeStream::kIdField, doc[DocumentSourceChangeStream::kIdField]},
- {DocumentSourceChangeStream::kClusterTimeField,
- doc[DocumentSourceChangeStream::kClusterTimeField]},
- {DocumentSourceChangeStream::kOperationTypeField, "invalidate"_sd}};
- }
-
return nextInput;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
index bc60687deae..065b15e7ccc 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
@@ -101,7 +101,6 @@ private:
: DocumentSource(expCtx) {}
bool _shouldCloseCursor = false;
- boost::optional<Document> _queuedInvalidate;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 8f522b26d4e..3d3e4bbb36d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_change_stream_transform.h"
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -155,21 +156,29 @@ public:
auto match = dynamic_cast<DocumentSourceMatch*>(stages[0].get());
ASSERT(match);
auto executableMatch = DocumentSourceMatch::create(match->getQuery(), getExpCtx());
-
- auto mock = DocumentSourceMock::create(D(entry));
- executableMatch->setSource(mock.get());
+ // Replace the original match with the executable one.
+ stages[0] = executableMatch;
// Check the oplog entry is transformed correctly.
auto transform = stages[1].get();
ASSERT(transform);
ASSERT_EQ(string(transform->getSourceName()), DSChangeStream::kStageName);
- transform->setSource(executableMatch.get());
- auto closeCursor = stages.back().get();
- ASSERT(closeCursor);
- closeCursor->setSource(transform);
-
- return {mock, executableMatch, transform, closeCursor};
+ // Create mock stage and insert at the front of the stages.
+ auto mock = DocumentSourceMock::create(D(entry));
+ stages.insert(stages.begin(), mock);
+
+ // Wire up the stages by setting the source stage.
+ auto prevStage = stages[0].get();
+ for (auto stageIt = stages.begin() + 1; stageIt != stages.end(); stageIt++) {
+ auto stage = (*stageIt).get();
+ // Do not include the check resume token stage since it will swallow the result.
+ if (dynamic_cast<DocumentSourceEnsureResumeTokenPresent*>(stage))
+ continue;
+ stage->setSource(prevStage);
+ prevStage = stage;
+ }
+ return stages;
}
vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) {
@@ -191,10 +200,13 @@ public:
Document makeResumeToken(Timestamp ts,
ImplicitValue uuid = Value(),
- ImplicitValue docKey = Value()) {
+ ImplicitValue docKey = Value(),
+ ResumeTokenData::FromInvalidate fromInvalidate =
+ ResumeTokenData::FromInvalidate::kNotFromInvalidate) {
ResumeTokenData tokenData;
tokenData.clusterTime = ts;
tokenData.documentKey = docKey;
+ tokenData.fromInvalidate = fromInvalidate;
if (!uuid.missing())
tokenData.uuid = uuid.getUuid();
return ResumeToken(tokenData).toDocument();
@@ -422,21 +434,6 @@ TEST_F(ChangeStreamStageTestNoSetup, FailsWithNoReplicationCoordinator) {
40573);
}
-TEST_F(ChangeStreamStageTest, StagesGeneratedCorrectly) {
- const auto spec = fromjson("{$changeStream: {}}");
-
- list<intrusive_ptr<DocumentSource>> result =
- DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
- vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
- ASSERT_EQUALS(stages.size(), 3UL);
- ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(stages.front().get()));
- ASSERT_EQUALS(string(stages[0]->getSourceName()), DSChangeStream::kStageName);
- ASSERT_EQUALS(string(stages[1]->getSourceName()), DSChangeStream::kStageName);
- ASSERT_EQUALS(string(stages[2]->getSourceName()), DSChangeStream::kStageName);
-
- // TODO: Check explain result.
-}
-
TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
nss, // namespace
@@ -663,9 +660,11 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(dropColl, expectedDrop, {}, kDefaultSpec, expectedInvalidate);
@@ -685,9 +684,11 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate);
@@ -725,9 +726,11 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
{DSChangeStream::kNamespaceField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
};
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate);
@@ -1014,7 +1017,7 @@ TEST_F(ChangeStreamStageTest, TransformationShouldBeAbleToReParseSerializedStage
auto originalSpec = BSON(DSChangeStream::kStageName << BSONObj());
auto result = DSChangeStream::createFromBson(originalSpec.firstElement(), expCtx);
vector<intrusive_ptr<DocumentSource>> allStages(std::begin(result), std::end(result));
- ASSERT_EQ(allStages.size(), 3UL);
+ ASSERT_EQ(allStages.size(), 4UL);
auto stage = allStages[1];
ASSERT(dynamic_cast<DocumentSourceChangeStreamTransform*>(stage.get()));
@@ -1054,9 +1057,11 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, testUuid(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
auto next = closeCursor->getNext();
@@ -1116,6 +1121,12 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
expectedInsert,
{{"_id"}}, // Mock the 'collectDocumentKeyFields' response.
BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+
+ // Verify the same behavior with resuming using 'startAfter'.
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {{"_id"}}, // Mock the 'collectDocumentKeyFields' response.
+ BSON("$changeStream" << BSON("startAfter" << resumeToken)));
}
TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
@@ -1152,6 +1163,12 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres
expectedInsert,
{{"_id"}}, // Mock the 'collectDocumentKeyFields' response.
BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+
+ // Verify the same behavior with resuming using 'startAfter'.
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {{"_id"}}, // Mock the 'collectDocumentKeyFields' response.
+ BSON("$changeStream" << BSON("startAfter" << resumeToken)));
}
TEST_F(ChangeStreamStageTest, ResumeAfterFailsIfResumeTokenDoesNotContainUUID) {
@@ -1207,6 +1224,29 @@ TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotif
checkTransformation(rename, expectedRename);
}
+TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
+ auto expCtx = getExpCtx();
+
+ // Need to put the collection in the UUID catalog so the resume token is valid.
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid());
+
+ const auto resumeTokenInvalidate =
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ BSON("x" << 2 << "_id" << 1),
+ ResumeTokenData::FromInvalidate::kFromInvalidate);
+
+ ASSERT_THROWS_CODE(DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON(
+ "resumeAfter" << resumeTokenInvalidate << "startAtOperationTime"
+ << kDefaultTs))
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::InvalidResumeToken);
+}
+
//
// Test class for change stream of a single database.
//
@@ -1448,9 +1488,11 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}}},
};
Document expectedInvalidate{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
- {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kIdField,
+ makeResumeToken(
+ kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kFromInvalidate)},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
};
checkTransformation(dropDB, expectedDropDatabase, {}, kDefaultSpec, expectedInvalidate);
@@ -1632,5 +1674,81 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken
BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
}
+TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
+ auto expCtx = getExpCtx();
+
+ // Need to put the collection in the UUID catalog so the resume token is valid.
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(expCtx->opCtx).onCreateCollection(expCtx->opCtx, &collection, testUuid());
+
+ const auto resumeTokenInvalidate =
+ makeResumeToken(kDefaultTs,
+ testUuid(),
+ BSON("x" << 2 << "_id" << 1),
+ ResumeTokenData::FromInvalidate::kFromInvalidate);
+
+ ASSERT_THROWS_CODE(
+ DSChangeStream::createFromBson(
+ BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeTokenInvalidate))
+ .firstElement(),
+ expCtx),
+ AssertionException,
+ ErrorCodes::InvalidResumeToken);
+}
+
+TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) {
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry.
+ auto resumeToken = makeResumeToken(
+ kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kNotFromInvalidate);
+
+ BSONObj insertDoc = BSON("_id" << 2);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc);
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {{"_id"}}, // Mock the 'collectDocumentKeyFields' response.
+ BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+
+TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContainUUID) {
+ const auto uuid = testUuid();
+
+ Collection collection(stdx::make_unique<CollectionMock>(nss));
+ UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid);
+
+ // Create a resume token from only the timestamp, similar to a 'dropDatabase' entry.
+ auto resumeToken = makeResumeToken(kDefaultTs);
+
+ BSONObj insertDoc = BSON("_id" << 2);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc);
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
+ };
+ checkTransformation(insertEntry,
+ expectedInsert,
+ {{"_id"}}, // Mock the 'collectDocumentKeyFields' response.
+ BSON("$changeStream" << BSON("startAfter" << resumeToken)));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index efeb1f69bbb..06e3c6230bf 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -86,8 +86,10 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
// If the change stream spec includes a resumeToken with a shard key, populate the document key
// cache with the field paths.
- if (auto resumeAfter = spec.getResumeAfter()) {
- ResumeToken token = resumeAfter.get();
+ auto resumeAfter = spec.getResumeAfter();
+ auto startAfter = spec.getStartAfter();
+ if (resumeAfter || startAfter) {
+ ResumeToken token = resumeAfter ? resumeAfter.get() : startAfter.get();
ResumeTokenData tokenData = token.getData();
if (!tokenData.documentKey.missing() && tokenData.uuid) {
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.cpp b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
new file mode 100644
index 00000000000..be9d47a1252
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.cpp
@@ -0,0 +1,113 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/document_source_check_invalidate.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+using DSCS = DocumentSourceChangeStream;
+
+namespace {
+
+// Returns true if the given 'operationType' should invalidate the change stream based on the
+// namespace in 'pExpCtx'.
+bool isInvalidatingCommand(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ StringData operationType) {
+ if (pExpCtx->isSingleNamespaceAggregation()) {
+ return operationType == DSCS::kDropCollectionOpType ||
+ operationType == DSCS::kRenameCollectionOpType ||
+ operationType == DSCS::kDropDatabaseOpType;
+ } else if (!pExpCtx->isClusterAggregation()) {
+ return operationType == DSCS::kDropDatabaseOpType;
+ } else {
+ return false;
+ }
+};
+
+} // namespace
+
+DocumentSource::GetNextResult DocumentSourceCheckInvalidate::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ invariant(!pExpCtx->inMongos);
+
+ if (_queuedInvalidate) {
+ const auto res = DocumentSource::GetNextResult(std::move(_queuedInvalidate.get()));
+ _queuedInvalidate.reset();
+ return res;
+ }
+
+ auto nextInput = pSource->getNext();
+ if (!nextInput.isAdvanced())
+ return nextInput;
+
+ auto doc = nextInput.getDocument();
+ const auto& kOperationTypeField = DSCS::kOperationTypeField;
+ DSCS::checkValueType(doc[kOperationTypeField], kOperationTypeField, BSONType::String);
+ auto operationType = doc[kOperationTypeField].getString();
+
+ // If this command should invalidate the stream, generate an invalidate entry and queue it up
+ // to be returned after the notification of this command. The new entry will have a nearly
+ // identical resume token to the notification for the command, except with an extra flag
+ // indicating that the token is from an invalidate. This flag is necessary to disambiguate
+ // the two tokens, and thus preserve a total ordering on the stream.
+
+ // As a special case, if a client receives an invalidate like this one and then wants to
+ // start a new stream after the invalidate, they can use the "startAfter" option, in which
+ // case '_ignoreFirstInvalidate' will be set, and we should ignore (AKA not generate) the
+ // very first invalidation.
+ if (isInvalidatingCommand(pExpCtx, operationType) && !_ignoreFirstInvalidate) {
+ auto resumeTokenData = ResumeToken::parse(doc[DSCS::kIdField].getDocument()).getData();
+ resumeTokenData.fromInvalidate = ResumeTokenData::FromInvalidate::kFromInvalidate;
+
+ MutableDocument result(Document{{DSCS::kIdField, ResumeToken(resumeTokenData).toDocument()},
+ {DSCS::kOperationTypeField, DSCS::kInvalidateOpType},
+ {DSCS::kClusterTimeField, doc[DSCS::kClusterTimeField]}});
+
+ // If we're in a sharded environment, we'll need to merge the results by their sort key, so
+ // add that as metadata.
+ result.copyMetaDataFrom(doc);
+
+ _queuedInvalidate = result.freeze();
+ }
+
+ // Regardless of whether the first document we see is an invalidating command, we only skip the
+ // first invalidate for streams with the 'startAfter' option, so we should not skip any
+ // invalidates that come after the first one.
+ _ignoreFirstInvalidate = false;
+
+ return nextInput;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_invalidate.h b/src/mongo/db/pipeline/document_source_check_invalidate.h
new file mode 100644
index 00000000000..18e51524709
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_invalidate.h
@@ -0,0 +1,82 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+
+namespace mongo {
+
+/**
+ * This stage is used internally for change stream notifications to artifically generate an
+ * "invalidate" entry for commands that should invalidate the change stream (e.g. collection drop
+ * for a single-collection change stream). It is not intended to be created by the user.
+ */
+class DocumentSourceCheckInvalidate final : public DocumentSource {
+public:
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final {
+ // This is used in error reporting.
+ return "$_checkInvalidate";
+ }
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return {StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
+ ChangeStreamRequirement::kChangeStreamStage};
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
+ // would result in it being created twice.
+ return Value();
+ }
+
+ static boost::intrusive_ptr<DocumentSourceCheckInvalidate> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, bool ignoreFirstInvalidate) {
+ return new DocumentSourceCheckInvalidate(expCtx, ignoreFirstInvalidate);
+ }
+
+private:
+ /**
+ * Use the create static method to create a DocumentSourceCheckInvalidate.
+ */
+ DocumentSourceCheckInvalidate(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ bool ignoreFirstInvalidate)
+ : DocumentSource(expCtx), _ignoreFirstInvalidate(ignoreFirstInvalidate) {}
+
+ boost::optional<Document> _queuedInvalidate;
+ bool _ignoreFirstInvalidate;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index ed71a826072..6df9e9fb41f 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -1536,7 +1536,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupSwapsWithIndependentMatch) {
auto spec = BSON("$changeStream" << BSON("fullDocument"
<< "updateLookup"));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
- ASSERT_EQ(stages.size(), 4UL);
+ ASSERT_EQ(stages.size(), 5UL);
// Make sure the change lookup is at the end.
ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
@@ -1561,7 +1561,7 @@ TEST(PipelineOptimizationTest, ChangeStreamLookupDoesNotSwapWithMatchOnPostImage
auto spec = BSON("$changeStream" << BSON("fullDocument"
<< "updateLookup"));
auto stages = DocumentSourceChangeStream::createFromBson(spec.firstElement(), expCtx);
- ASSERT_EQ(stages.size(), 4UL);
+ ASSERT_EQ(stages.size(), 5UL);
// Make sure the change lookup is at the end.
ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
index 8ed5e353bf9..32b688c179a 100644
--- a/src/mongo/db/pipeline/resume_token.cpp
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -70,13 +70,14 @@ ResumeToken::ResumeToken(const Document& resumeDoc) {
}
// We encode the resume token as a KeyString with the sequence:
-// clusterTime, version, applyOpsIndex, uuid, documentKey
-// Only the clusterTime, version, and applyOpsIndex are required.
+// clusterTime, version, applyOpsIndex, fromInvalidate, uuid, documentKey
+// Only the clusterTime, version, applyOpsIndex, and fromInvalidate are required.
ResumeToken::ResumeToken(const ResumeTokenData& data) {
BSONObjBuilder builder;
builder.append("", data.clusterTime);
builder.append("", data.version);
builder.appendNumber("", data.applyOpsIndex);
+ builder.appendBool("", data.fromInvalidate);
uassert(50788,
"Unexpected resume token with a documentKey but no UUID",
data.uuid || data.documentKey.missing());
@@ -148,6 +149,13 @@ ResumeTokenData ResumeToken::getData() const {
uassert(50794, "Invalid Resume Token: applyOpsIndex should be non-negative", applyOpsInd >= 0);
result.applyOpsIndex = applyOpsInd;
+ uassert(50872, "Resume Token does not contain fromInvalidate", i.more());
+ auto fromInvalidate = i.next();
+ uassert(50870,
+ "Resume Token fromInvalidate is not a boolean.",
+ fromInvalidate.type() == BSONType::Bool);
+ result.fromInvalidate = ResumeTokenData::FromInvalidate(fromInvalidate.boolean());
+
// The UUID and documentKey are not required.
if (!i.more()) {
return result;
@@ -158,6 +166,7 @@ ResumeTokenData ResumeToken::getData() const {
if (i.more()) {
result.documentKey = Value(i.next());
}
+
uassert(40646, "invalid oversized resume token", !i.more());
return result;
}
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
index 42b5b2b9f03..25318719e67 100644
--- a/src/mongo/db/pipeline/resume_token.h
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -40,6 +40,14 @@
namespace mongo {
struct ResumeTokenData {
+ /**
+ * Flag to indicate if the resume token is from an invalidate notification.
+ */
+ enum FromInvalidate : bool {
+ kFromInvalidate = true,
+ kNotFromInvalidate = false,
+ };
+
ResumeTokenData(){};
ResumeTokenData(Timestamp clusterTimeIn,
int versionIn,
@@ -62,6 +70,10 @@ struct ResumeTokenData {
size_t applyOpsIndex = 0;
Value documentKey;
boost::optional<UUID> uuid;
+ // Flag to indicate that this resume token is from an "invalidate" entry. This will not be set
+ // on a token from a command that *would* invalidate a change stream, but rather the invalidate
+ // notification itself.
+ FromInvalidate fromInvalidate = FromInvalidate::kNotFromInvalidate;
};
std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData);