diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-09 12:25:35 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-29 17:06:54 -0400 |
commit | 55f4dbf94a1cce9d8642af9bba9ac4cc77627293 (patch) | |
tree | 5c3ca284176dd15536251b76797a2c89354f83d8 | |
parent | 1e9a55f9bba4909732ba0b06bd3547df152864bf (diff) | |
download | mongo-55f4dbf94a1cce9d8642af9bba9ac4cc77627293.tar.gz |
SERVER-34705: Whole-DB or whole-cluster change streams may not provide a total ordering if resumed after a drop
5 files changed, 248 insertions, 12 deletions
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_invalidation.js index ebfc59cd8c6..a50d8eb88ea 100644 --- a/jstests/sharding/change_stream_invalidation.js +++ b/jstests/sharding/change_stream_invalidation.js @@ -71,8 +71,6 @@ assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "insert"); - // TODO SERVER-34705: Extract the shard key from the resume token and include in the documentKey - // for inserts. assert.eq(next.documentKey, {_id: 2}); assert.soon(() => changeStream.hasNext()); @@ -93,7 +91,7 @@ assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "insert"); - assert.eq(next.documentKey, {_id: 2}); + assert.eq(next.documentKey, {shardKey: 2, _id: 2}); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); diff --git a/jstests/sharding/change_streams_primary_shard_unaware.js b/jstests/sharding/change_streams_primary_shard_unaware.js index f59775cacb1..6975678d18d 100644 --- a/jstests/sharding/change_streams_primary_shard_unaware.js +++ b/jstests/sharding/change_streams_primary_shard_unaware.js @@ -165,7 +165,7 @@ cstMongos2.assertNextChangesEqual({ cursor: cursorMongos2, expectedChanges: [{ - documentKey: {_id: 2}, + documentKey: {_id: 2, a: 2}, fullDocument: {_id: 2, a: 2}, ns: {db: mongos2DB.getName(), coll: mongos2Coll.getName()}, operationType: "insert", diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js index d43294e7772..bc7d559610a 100644 --- a/jstests/sharding/change_streams_whole_db.js +++ b/jstests/sharding/change_streams_whole_db.js @@ -5,6 +5,7 @@ load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection. // For supportsMajorityReadConcern(). load("jstests/multiVersion/libs/causal_consistency_helpers.js"); @@ -151,7 +152,38 @@ operationType: "insert", }, ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + + const results = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + // Store the resume token of the first insert to use after dropping the collection. + const resumeTokenBeforeDrop = results[0]._id; + + // Write one more document to the collection that will be dropped, to be returned after + // resuming. + assert.writeOK(mongosCollShardedOnX.insert({_id: 4, x: 4})); + + // Drop the collection, invalidating the open change stream. + assertDropCollection(mongosDB, mongosCollShardedOnX.getName()); + + // Resume the change stream from before the collection drop, and verify that the documentKey + // field contains the extracted shard key from the resume token. + cursor = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {resumeAfter: resumeTokenBeforeDrop}}, + {$match: {"ns.coll": mongosCollShardedOnX.getName()}} + ], + collection: 1 + }); + cst.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: [ + { + documentKey: {_id: 4, x: 4}, + fullDocument: {_id: 4, x: 4}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, + operationType: "insert", + }, + ] + }); cst.cleanUp(); diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 70a61705fbb..af12c25c7d8 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -73,6 +73,7 @@ using DSChangeStream = DocumentSourceChangeStream; static const Timestamp kDefaultTs(100, 1); static const repl::OpTime kDefaultOpTime(kDefaultTs, 1); static const NamespaceString nss("unittests.change_stream"); +static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}"); class ChangeStreamStageTestNoSetup : public AggregationContextFixture { public: @@ -88,7 +89,7 @@ struct MockMongoInterface final : public StubMongoProcessInterface { std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*, UUID) const final { - return {_fields, true}; + return {_fields, false}; } std::vector<FieldPath> _fields; @@ -110,8 +111,9 @@ public: void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc, - std::vector<FieldPath> docKeyFields = {}) { - vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry); + std::vector<FieldPath> docKeyFields, + const BSONObj& spec) { + vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec); auto transform = stages[2].get(); getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(docKeyFields); @@ -124,12 +126,17 @@ public: } } + void checkTransformation(const OplogEntry& entry, + const boost::optional<Document> expectedDoc, + std::vector<FieldPath> docKeyFields = {}) { + return checkTransformation(entry, expectedDoc, docKeyFields, kDefaultSpec); + } + /** * Returns a list of stages expanded from a $changStream specification, starting with a * DocumentSourceMock which contains a single document representing 'entry'. */ - vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry) { - const auto spec = fromjson("{$changeStream: {}}"); + vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry, const BSONObj& spec) { list<intrusive_ptr<DocumentSource>> result = DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); @@ -163,7 +170,7 @@ public: } vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { - return makeStages(entry.toBSON()); + return makeStages(entry.toBSON(), kDefaultSpec); } OplogEntry createCommand(const BSONObj& oField, @@ -210,7 +217,7 @@ public: BSONObj oplogEntry = builder.done(); // Create the stages and check that the documents produced matched those in the applyOps. - vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry); + vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, kDefaultSpec); auto transform = stages[2].get(); invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); @@ -997,6 +1004,91 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); } +TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, + }; + // Although the chunk manager and sharding catalog are not aware of the shard key in this test, + // the expectation is for the $changeStream stage to infer the shard key from the resume token. + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + // Note that the 'o' field in the oplog entry does not contain the shard key field. + BSONObj insertDoc = BSON("_id" << 2); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) { + auto resumeToken = makeResumeToken(Timestamp(3, 45)); + + // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior. + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(), + getExpCtx()), + AssertionException, + 40645); +} + // // Test class for change stream of a single database. // @@ -1260,5 +1352,88 @@ TEST_F(ChangeStreamStageDBTest, MatchFiltersCreateIndex) { checkTransformation(createIndex, boost::none); } +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + // Note that the 'o' field in the oplog entry does not contain the shard key field. + BSONObj insertDoc = BSON("_id" << 2); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) { + auto resumeToken = makeResumeToken(Timestamp(3, 45)); + + // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior. + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(), + getExpCtx()), + AssertionException, + 40645); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index bb98f3bfffe..9e7e623eeab 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -81,6 +81,37 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( _isIndependentOfAnyCollection(isIndependentOfAnyCollection) { _nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns)); + + auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), + _changeStreamSpec); + + // If the change stream spec includes a resumeToken with a shard key, populate the document key + // cache with the field paths. + if (auto resumeAfter = spec.getResumeAfter()) { + ResumeToken token = resumeAfter.get(); + ResumeTokenData tokenData = token.getData(); + + // TODO SERVER-34710: Resuming from an "invalidate" means that the resume token may not + // always contain a UUID. + invariant(tokenData.uuid); + if (!tokenData.documentKey.missing()) { + std::vector<FieldPath> docKeyFields; + auto docKey = tokenData.documentKey.getDocument(); + + auto iter = docKey.fieldIterator(); + while (iter.more()) { + auto fieldPair = iter.next(); + docKeyFields.push_back(fieldPair.first); + } + + // If the document key from the resume token has more than one field, that means it + // includes the shard key and thus should never change. + auto isFinal = docKey.size() > 1; + + _documentKeyCache[tokenData.uuid.get()] = + DocumentKeyCacheEntry({docKeyFields, isFinal}); + } + } } DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints( |