diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-09 12:25:35 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-04 12:09:50 -0400 |
commit | 753cedd024b9f4fe1a83632db792f29d3a7e4454 (patch) | |
tree | 0cd094ebb24806358e02d111cc47dadf113484ff /src/mongo/db | |
parent | 851c59e7bc5b54c0cf5feb683398a0eb6dffc20f (diff) | |
download | mongo-753cedd024b9f4fe1a83632db792f29d3a7e4454.tar.gz |
SERVER-34705: Whole-DB or whole-cluster change streams may not provide a total ordering if resumed after a drop
(cherry picked from commit 55f4dbf94a1cce9d8642af9bba9ac4cc77627293)
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_test.cpp | 189 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 31 |
2 files changed, 213 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index a8412e0d8bf..1cb939b573a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -73,6 +73,7 @@ using DSChangeStream = DocumentSourceChangeStream; static const Timestamp kDefaultTs(100, 1); static const repl::OpTime kDefaultOpTime(kDefaultTs, 1); static const NamespaceString nss("unittests.change_stream"); +static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}"); class ChangeStreamStageTestNoSetup : public AggregationContextFixture { public: @@ -88,7 +89,7 @@ struct MockMongoInterface final : public StubMongoProcessInterface { std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*, UUID) const final { - return {_fields, true}; + return {_fields, false}; } std::vector<FieldPath> _fields; @@ -107,8 +108,9 @@ public: void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc, - std::vector<FieldPath> docKeyFields = {}) { - vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry); + std::vector<FieldPath> docKeyFields, + const BSONObj& spec) { + vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec); auto transform = stages[2].get(); getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(docKeyFields); @@ -121,12 +123,17 @@ public: } } + void checkTransformation(const OplogEntry& entry, + const boost::optional<Document> expectedDoc, + std::vector<FieldPath> docKeyFields = {}) { + return checkTransformation(entry, expectedDoc, docKeyFields, kDefaultSpec); + } + /** * Returns a list of stages expanded from a $changStream specification, starting with a * DocumentSourceMock which contains a single document representing 'entry'. */ - vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry) { - const auto spec = fromjson("{$changeStream: {}}"); + vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry, const BSONObj& spec) { list<intrusive_ptr<DocumentSource>> result = DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); @@ -160,7 +167,7 @@ public: } vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { - return makeStages(entry.toBSON()); + return makeStages(entry.toBSON(), kDefaultSpec); } OplogEntry createCommand(const BSONObj& oField, @@ -207,7 +214,7 @@ public: BSONObj oplogEntry = builder.done(); // Create the stages and check that the documents produced matched those in the applyOps. - vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry); + vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, kDefaultSpec); auto transform = stages[2].get(); invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); @@ -994,6 +1001,91 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); } +TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, + }; + // Although the chunk manager and sharding catalog are not aware of the shard key in this test, + // the expectation is for the $changeStream stage to infer the shard key from the resume token. + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + // Note that the 'o' field in the oplog entry does not contain the shard key field. + BSONObj insertDoc = BSON("_id" << 2); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) { + auto resumeToken = makeResumeToken(Timestamp(3, 45)); + + // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior. + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(), + getExpCtx()), + AssertionException, + 40645); +} + // // Test class for change stream of a single database. // @@ -1257,5 +1349,88 @@ TEST_F(ChangeStreamStageDBTest, MatchFiltersCreateIndex) { checkTransformation(createIndex, boost::none); } +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + // Note that the 'o' field in the oplog entry does not contain the shard key field. + BSONObj insertDoc = BSON("_id" << 2); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) { + auto resumeToken = makeResumeToken(Timestamp(3, 45)); + + // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior. + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(), + getExpCtx()), + AssertionException, + 40645); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index bb98f3bfffe..9e7e623eeab 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -81,6 +81,37 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( _isIndependentOfAnyCollection(isIndependentOfAnyCollection) { _nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns)); + + auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), + _changeStreamSpec); + + // If the change stream spec includes a resumeToken with a shard key, populate the document key + // cache with the field paths. + if (auto resumeAfter = spec.getResumeAfter()) { + ResumeToken token = resumeAfter.get(); + ResumeTokenData tokenData = token.getData(); + + // TODO SERVER-34710: Resuming from an "invalidate" means that the resume token may not + // always contain a UUID. + invariant(tokenData.uuid); + if (!tokenData.documentKey.missing()) { + std::vector<FieldPath> docKeyFields; + auto docKey = tokenData.documentKey.getDocument(); + + auto iter = docKey.fieldIterator(); + while (iter.more()) { + auto fieldPair = iter.next(); + docKeyFields.push_back(fieldPair.first); + } + + // If the document key from the resume token has more than one field, that means it + // includes the shard key and thus should never change. + auto isFinal = docKey.size() > 1; + + _documentKeyCache[tokenData.uuid.get()] = + DocumentKeyCacheEntry({docKeyFields, isFinal}); + } + } } DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints( |