diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-09 12:25:35 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-04 12:09:50 -0400 |
commit | 753cedd024b9f4fe1a83632db792f29d3a7e4454 (patch) | |
tree | 0cd094ebb24806358e02d111cc47dadf113484ff | |
parent | 851c59e7bc5b54c0cf5feb683398a0eb6dffc20f (diff) | |
download | mongo-753cedd024b9f4fe1a83632db792f29d3a7e4454.tar.gz |
SERVER-34705: Whole-DB or whole-cluster change streams may not provide a total ordering if resumed after a drop
(cherry picked from commit 55f4dbf94a1cce9d8642af9bba9ac4cc77627293)
5 files changed, 265 insertions, 25 deletions
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_invalidation.js index 396db586c4f..eb51724f8ea 100644 --- a/jstests/sharding/change_stream_invalidation.js +++ b/jstests/sharding/change_stream_invalidation.js @@ -30,50 +30,53 @@ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); - // Shard the test collection on _id. + // Shard the test collection on a field called 'shardKey'. assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}})); // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. assert.commandWorked( - mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 0}})); // Move the [0, MaxKey] chunk to st.shard1.shardName. assert.commandWorked(mongosDB.adminCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + {moveChunk: mongosColl.getFullName(), find: {shardKey: 1}, to: st.rs1.getURL()})); // Write a document to each chunk. - assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({shardKey: -1, _id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({shardKey: 1, _id: 1}, {writeConcern: {w: "majority"}})); - let changeStream = mongosColl.aggregate([{$changeStream: {}}]); + let changeStream = mongosColl.watch(); // We awaited the replication of the first writes, so the change stream shouldn't return them. - assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); - assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({shardKey: -1, _id: -1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}})); + assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2})); // Drop the collection and test that we return "invalidate" entry and close the cursor. mongosColl.drop(); - st.rs0.awaitReplication(); - st.rs1.awaitReplication(); // Test that we see the two writes that happened before the invalidation. assert.soon(() => changeStream.hasNext()); let next = changeStream.next(); assert.eq(next.operationType, "update"); - assert.eq(next.documentKey._id, -1); + assert.eq(next.documentKey.shardKey, -1); const resumeTokenFromFirstUpdate = next._id; assert.soon(() => changeStream.hasNext()); next = changeStream.next(); assert.eq(next.operationType, "update"); - assert.eq(next.documentKey._id, 1); + assert.eq(next.documentKey.shardKey, 1); assert.soon(() => changeStream.hasNext()); next = changeStream.next(); - assert.eq(next.operationType, "invalidate"); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey, {_id: 2}); - assert(!changeStream.hasNext(), "expected invalidation to cause the cursor to be closed"); + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "invalidate"); + assert(changeStream.isExhausted()); // Test that it is not possible to resume a change stream after a collection has been dropped. // Once it's been dropped, we won't be able to figure out the shard key. @@ -82,7 +85,6 @@ assert.commandFailedWithCode(mongosDB.runCommand({ aggregate: mongosColl.getName(), pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}], - readConcern: {level: "majority"}, cursor: {} }), 40615); diff --git a/jstests/sharding/change_streams_primary_shard_unaware.js b/jstests/sharding/change_streams_primary_shard_unaware.js index ec4027bb800..89f8d0ebf6d 100644 --- a/jstests/sharding/change_streams_primary_shard_unaware.js +++ b/jstests/sharding/change_streams_primary_shard_unaware.js @@ -168,7 +168,7 @@ cstMongos2.assertNextChangesEqual({ cursor: cursorMongos2, expectedChanges: [{ - documentKey: {_id: 2}, + documentKey: {_id: 2, a: 2}, fullDocument: {_id: 2, a: 2}, ns: {db: mongos2DB.getName(), coll: mongos2Coll.getName()}, operationType: "insert", diff --git a/jstests/sharding/change_streams_whole_db.js b/jstests/sharding/change_streams_whole_db.js index d43294e7772..bc7d559610a 100644 --- a/jstests/sharding/change_streams_whole_db.js +++ b/jstests/sharding/change_streams_whole_db.js @@ -5,6 +5,7 @@ load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection. // For supportsMajorityReadConcern(). load("jstests/multiVersion/libs/causal_consistency_helpers.js"); @@ -151,7 +152,38 @@ operationType: "insert", }, ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + + const results = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + // Store the resume token of the first insert to use after dropping the collection. + const resumeTokenBeforeDrop = results[0]._id; + + // Write one more document to the collection that will be dropped, to be returned after + // resuming. + assert.writeOK(mongosCollShardedOnX.insert({_id: 4, x: 4})); + + // Drop the collection, invalidating the open change stream. + assertDropCollection(mongosDB, mongosCollShardedOnX.getName()); + + // Resume the change stream from before the collection drop, and verify that the documentKey + // field contains the extracted shard key from the resume token. + cursor = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {resumeAfter: resumeTokenBeforeDrop}}, + {$match: {"ns.coll": mongosCollShardedOnX.getName()}} + ], + collection: 1 + }); + cst.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: [ + { + documentKey: {_id: 4, x: 4}, + fullDocument: {_id: 4, x: 4}, + ns: {db: mongosDB.getName(), coll: mongosCollShardedOnX.getName()}, + operationType: "insert", + }, + ] + }); cst.cleanUp(); diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index a8412e0d8bf..1cb939b573a 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -73,6 +73,7 @@ using DSChangeStream = DocumentSourceChangeStream; static const Timestamp kDefaultTs(100, 1); static const repl::OpTime kDefaultOpTime(kDefaultTs, 1); static const NamespaceString nss("unittests.change_stream"); +static const BSONObj kDefaultSpec = fromjson("{$changeStream: {}}"); class ChangeStreamStageTestNoSetup : public AggregationContextFixture { public: @@ -88,7 +89,7 @@ struct MockMongoInterface final : public StubMongoProcessInterface { std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*, UUID) const final { - return {_fields, true}; + return {_fields, false}; } std::vector<FieldPath> _fields; @@ -107,8 +108,9 @@ public: void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc, - std::vector<FieldPath> docKeyFields = {}) { - vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry); + std::vector<FieldPath> docKeyFields, + const BSONObj& spec) { + vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec); auto transform = stages[2].get(); getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(docKeyFields); @@ -121,12 +123,17 @@ public: } } + void checkTransformation(const OplogEntry& entry, + const boost::optional<Document> expectedDoc, + std::vector<FieldPath> docKeyFields = {}) { + return checkTransformation(entry, expectedDoc, docKeyFields, kDefaultSpec); + } + /** * Returns a list of stages expanded from a $changStream specification, starting with a * DocumentSourceMock which contains a single document representing 'entry'. */ - vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry) { - const auto spec = fromjson("{$changeStream: {}}"); + vector<intrusive_ptr<DocumentSource>> makeStages(const BSONObj& entry, const BSONObj& spec) { list<intrusive_ptr<DocumentSource>> result = DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); @@ -160,7 +167,7 @@ public: } vector<intrusive_ptr<DocumentSource>> makeStages(const OplogEntry& entry) { - return makeStages(entry.toBSON()); + return makeStages(entry.toBSON(), kDefaultSpec); } OplogEntry createCommand(const BSONObj& oField, @@ -207,7 +214,7 @@ public: BSONObj oplogEntry = builder.done(); // Create the stages and check that the documents produced matched those in the applyOps. - vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry); + vector<intrusive_ptr<DocumentSource>> stages = makeStages(oplogEntry, kDefaultSpec); auto transform = stages[2].get(); invariant(dynamic_cast<DocumentSourceChangeStreamTransform*>(transform) != nullptr); @@ -994,6 +1001,91 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); } +TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, + }; + // Although the chunk manager and sharding catalog are not aware of the shard key in this test, + // the expectation is for the $changeStream stage to infer the shard key from the resume token. + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + // Note that the 'o' field in the oplog entry does not contain the shard key field. + BSONObj insertDoc = BSON("_id" << 2); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) { + auto resumeToken = makeResumeToken(Timestamp(3, 45)); + + // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior. + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(), + getExpCtx()), + AssertionException, + 40645); +} + // // Test class for change stream of a single database. // @@ -1257,5 +1349,88 @@ TEST_F(ChangeStreamStageDBTest, MatchFiltersCreateIndex) { checkTransformation(createIndex, boost::none); } +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + Collection collection(stdx::make_unique<CollectionMock>(nss)); + UUIDCatalog::get(getExpCtx()->opCtx).onCreateCollection(getExpCtx()->opCtx, &collection, uuid); + + BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, o2); + + // Note that the 'o' field in the oplog entry does not contain the shard key field. + BSONObj insertDoc = BSON("_id" << 2); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + boost::none, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, + }; + checkTransformation(insertEntry, + expectedInsert, + {}, + BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) { + auto resumeToken = makeResumeToken(Timestamp(3, 45)); + + // TODO SERVER-34710: Allow resuming from an "invalidate" will change this behavior. + ASSERT_THROWS_CODE( + DSChangeStream::createFromBson( + BSON(DSChangeStream::kStageName << BSON("resumeAfter" << resumeToken)).firstElement(), + getExpCtx()), + AssertionException, + 40645); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index bb98f3bfffe..9e7e623eeab 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -81,6 +81,37 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( _isIndependentOfAnyCollection(isIndependentOfAnyCollection) { _nsRegex.emplace(DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx->ns)); + + auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), + _changeStreamSpec); + + // If the change stream spec includes a resumeToken with a shard key, populate the document key + // cache with the field paths. + if (auto resumeAfter = spec.getResumeAfter()) { + ResumeToken token = resumeAfter.get(); + ResumeTokenData tokenData = token.getData(); + + // TODO SERVER-34710: Resuming from an "invalidate" means that the resume token may not + // always contain a UUID. + invariant(tokenData.uuid); + if (!tokenData.documentKey.missing()) { + std::vector<FieldPath> docKeyFields; + auto docKey = tokenData.documentKey.getDocument(); + + auto iter = docKey.fieldIterator(); + while (iter.more()) { + auto fieldPair = iter.next(); + docKeyFields.push_back(fieldPair.first); + } + + // If the document key from the resume token has more than one field, that means it + // includes the shard key and thus should never change. + auto isFinal = docKey.size() > 1; + + _documentKeyCache[tokenData.uuid.get()] = + DocumentKeyCacheEntry({docKeyFields, isFinal}); + } + } } DocumentSource::StageConstraints DocumentSourceChangeStreamTransform::constraints( |