diff options
author | Rui Liu <rui.liu@mongodb.com> | 2022-01-21 18:38:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-21 19:25:45 +0000 |
commit | acd1c72c25fdeff22ad6c0fa8c8ac735d8cc9da8 (patch) | |
tree | c4ee3359777f276918b7d064529c86a92159a96f | |
parent | fe264a7777c9199304f7e28c25a5b5dd2fba47e0 (diff) | |
download | mongo-acd1c72c25fdeff22ad6c0fa8c8ac735d8cc9da8.tar.gz |
SERVER-61892 Read document key from oplog instead of cache
17 files changed, 478 insertions, 355 deletions
diff --git a/jstests/multiVersion/change_stream_resume.js b/jstests/multiVersion/change_stream_resume.js new file mode 100644 index 00000000000..be160250a7e --- /dev/null +++ b/jstests/multiVersion/change_stream_resume.js @@ -0,0 +1,95 @@ +// Verify that we can successfully resume a change stream using a token generated on an older +// version of the server from an insert oplog entry that does not have the documentKey embedded in +// its "o2" field. Also verify that we can resume on a downgraded cluster using a token generated on +// the latest version of the server. +// +// @tags: [uses_change_streams, requires_replication] + +(function() { +"use strict"; + +load("jstests/multiVersion/libs/multi_cluster.js"); // For upgradeCluster. + +function checkNextDoc({changeStream, doc, docKeyFields}) { + assert.soon(() => changeStream.hasNext()); + const change = changeStream.next(); + assert.docEq(change.fullDocument, doc); + assert.eq(Object.keys(change.documentKey), docKeyFields); + return changeStream.getResumeToken(); +} + +function runTest(oldVersion) { + const dbName = "test"; + const collName = "change_streams_resume"; + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 2, + binVersion: oldVersion, + setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1} + }, + other: {mongosOptions: {binVersion: oldVersion}} + }); + st.shardColl(collName, + {shard: 1} /* Shard key */, + {shard: 1} /* Split at */, + {shard: 1} /* Move the chunk containing {shard: 1} to its own shard */, + dbName, + true /* Wait until documents orphaned by the move get deleted */); + + // Establish a resume token before anything actually happens in the test. + let coll = st.s.getDB(dbName).getCollection(collName); + let changeStream = coll.watch(); + const startOfTestResumeToken = changeStream.getResumeToken(); + + const docs = [{_id: 0, shard: 0}, {_id: 1, shard: 1}]; + assert.commandWorked(coll.insert(docs)); + + // Verify that we see the first inserted document, and obtain its resume token. + const resumeTokenWithShardKey = + checkNextDoc({changeStream, doc: docs[0], docKeyFields: ["shard", "_id"]}); + + // Upgrade the cluster to the latest. + st.upgradeCluster( + "latest", + {upgradeShards: true, upgradeConfigs: true, upgradeMongos: true, waitUntilStable: true}); + coll = st.s.getDB(dbName).getCollection(collName); + + // Confirm that we can use the resume token with shard keys generated on the old version to + // resume the new stream. This is true even though the documentKey is not embedded in the oplog, + // and this would usually result in a resume token without any shard key fields. + checkNextDoc({ + changeStream: coll.watch([], {resumeAfter: resumeTokenWithShardKey}), + doc: docs[1], + docKeyFields: ["shard", "_id"] + }); + + // Now start a new stream on "latest" from the start-of-test resume point. Confirm that we see + // the first insert, and that this time the documentKey does not have any shard key fields. + const resumeTokenNoShardKey = checkNextDoc({ + changeStream: coll.watch([], {resumeAfter: startOfTestResumeToken}), + doc: docs[0], + docKeyFields: ["_id"] + }); + + // Downgrade the cluster again. + st.upgradeCluster( + oldVersion, + {upgradeShards: true, upgradeConfigs: true, upgradeMongos: true, waitUntilStable: true}); + coll = st.s.getDB(dbName).getCollection(collName); + + // Confirm that we can resume the stream from the resume token generated on "latest", + // even though the token only contains _id while the resumed stream will produce a token + // that includes the shard key fields. + checkNextDoc({ + changeStream: coll.watch([], {resumeAfter: resumeTokenNoShardKey}), + doc: docs[1], + docKeyFields: ["shard", "_id"] + }); + + st.stop(); +} + +runTest('last-continuous'); +runTest('last-lts'); +}()); diff --git a/jstests/sharding/change_stream_resume_shard_key_change.js b/jstests/sharding/change_stream_resume_shard_key_change.js new file mode 100644 index 00000000000..945af3e2337 --- /dev/null +++ b/jstests/sharding/change_stream_resume_shard_key_change.js @@ -0,0 +1,87 @@ +// Tests resuming change streams when shard key is changed. +// +// @tags: [ +// requires_majority_read_concern, +// uses_change_streams, +// requires_fcv_53, +// ] + +(function() { +"use strict"; + +const dbName = 'testDB'; +const collName = 'testColl'; + +const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Use the noop writer with a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} + } +}); + +const db = st.s.getDB(dbName); +const coll = db.getCollection(collName); +const changeStream = coll.watch(); + +const docs = [ + {_id: 0, shardField1: 0, shardField2: 0}, + {_id: 1, shardField1: 1, shardField2: 0}, + {_id: 2, shardField1: 1, shardField2: 1}, + {_id: 3, shardField1: 0, shardField2: 1}, +]; +const docKeys = [ + {_id: 0}, + {shardField1: 1, _id: 1}, + {shardField1: 1, shardField2: 1, _id: 2}, + {shardField1: 0, shardField2: 1, _id: 3}, +]; + +// Document inserted in unsharded collection. +assert.commandWorked(coll.insert(docs[0])); + +// Document inserted in sharded collection. +assert.commandWorked(coll.createIndex({shardField1: 1})); +st.shardColl(collName, + {shardField1: 1} /* Shard key */, + {shardField1: 1} /* Split at */, + {shardField1: 1} /* Move the chunk containing {shardField1: 1} to its own shard */, + dbName, + true /* Wait until documents orphaned by the move get deleted */); +assert.commandWorked(coll.insert(docs[1])); + +// Document inserted in shard key refined collection. +assert.commandWorked(coll.createIndex({shardField1: 1, shardField2: 1})); +assert.commandWorked(db.adminCommand({ + refineCollectionShardKey: `${dbName}.${collName}`, + key: {shardField1: 1, shardField2: 1}, +})); +assert.commandWorked(coll.insert(docs[2])); + +// Insert one more document as a final sentinel, to ensure that there is always at least one visible +// event following the resume points we wish to test. +assert.commandWorked(coll.insert(docs[3])); + +const verifyChanges = (changeStream, startingIndex) => { + const changes = []; + assert.soon(() => { + while (changeStream.hasNext()) { + changes.push(changeStream.next()); + } + return changes.length === docs.length - startingIndex; + }); + assert.docEq(changes.map(x => x.fullDocument), docs.slice(startingIndex)); + assert.docEq(changes.map(x => x.documentKey), docKeys.slice(startingIndex)); + return changes; +}; + +// Verify that we can resume from each change point. +const changes = verifyChanges(changeStream, 0); +changes.forEach((change, i) => { + verifyChanges(coll.watch([], {resumeAfter: change._id}), i + 1); +}); + +st.stop(); +})(); diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js index b44fb6c1f88..9998ef9bcda 100644 --- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js +++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js @@ -155,19 +155,19 @@ function testUnshardedBecomesSharded(collToWatch) { assert.commandWorked(mongosColl.insert({_id: -1, z: -1})); assert.commandWorked(mongosColl.insert({_id: 1, z: 1})); - // Verify that the change stream picks up the inserts, however the shard key is missing - // since the collection has since been dropped and recreated. + // Verify that the change stream picks up the inserts. The shard keys are present since they are + // recorded in the oplog. cst.assertNextChangesEqual({ cursor: cursor, expectedChanges: [ { - documentKey: {_id: -2}, + documentKey: {x: -2, _id: -2}, fullDocument: {_id: -2, x: -2}, ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, operationType: "insert", }, { - documentKey: {_id: -3}, + documentKey: {x: -3, _id: -3}, fullDocument: {_id: -3, x: -3}, ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, operationType: "insert", diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp index 861f86f0992..24f06821a9b 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp @@ -122,5 +122,22 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo return Document{opLogEntry.getObject().getOwned()}; } +boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache( + const ResumeTokenData& tokenData) { + if (!tokenData.documentKey.missing() && tokenData.uuid) { + std::vector<FieldPath> docKeyFields; + auto docKey = tokenData.documentKey.getDocument(); + + auto iter = docKey.fieldIterator(); + while (iter.more()) { + auto fieldPair = iter.next(); + docKeyFields.push_back(fieldPair.first); + } + + return std::make_pair(tokenData.uuid.get(), docKeyFields); + } + return {}; +} + } // namespace change_stream_legacy } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h index 38c86bd120a..0ac51bab7ac 100644 --- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h +++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h @@ -47,4 +47,12 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline( boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionContext> pExpCtx, const Document& preImageId); +/** + * Builds document key cache from the resume token. The cache will be used when the insert oplog + * entry does not contain the documentKey. This can happen when reading an oplog entry written by an + * older version of the server. + */ +boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache( + const ResumeTokenData& data); + } // namespace mongo::change_stream_legacy diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp index b68edc81899..abac48c6ae8 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp @@ -47,19 +47,8 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckResumability, // Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies // the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token, -// and ResumeToken::kCannotResume if it is more recent than the client's resume token (indicating -// that we will never see the token). If the resume token's documentKey contains only the _id field -// while the pipeline documentKey contains additional fields, then the collection has become sharded -// since the resume token was generated. In that case, we relax the requirements such that only the -// timestamp, version, txnOpIndex, UUID and documentKey._id need match. This remains correct, since -// the only circumstances under which the resume token omits the shard key is if it was generated -// either (1) before the collection was sharded, (2) after the collection was sharded but before the -// primary shard became aware of that fact, implying that it was before the first chunk moved off -// the shard, or (3) by a malicious client who has constructed their own resume token. In the first -// two cases, we can be guaranteed that the _id is unique and the stream can therefore be resumed -// seamlessly; in the third case, the worst that can happen is that some entries are missed or -// duplicated. Note that the simple collation is used to compare the resume tokens, and that we -// purposefully avoid the user's requested collation if present. +// and ResumeToken::kSurpassedToken if it is more recent than the client's resume token (indicating +// that we will never see the token). DocumentSourceChangeStreamCheckResumability::ResumeStatus DocumentSourceChangeStreamCheckResumability::compareAgainstClientResumeToken( const intrusive_ptr<ExpressionContext>& expCtx, @@ -127,50 +116,11 @@ DocumentSourceChangeStreamCheckResumability::compareAgainstClientResumeToken( // At this point, we know that the tokens differ only by documentKey. The status we return will // depend on whether the stream token is logically before or after the client token. If the - // latter, then we will never see the resume token and the stream cannot be resumed. However, - // before we can return this value, we need to check the possibility that the resumed stream is - // on a sharded collection and the client token is from before the collection was sharded. - const auto defaultResumeStatus = - ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey > - tokenDataFromClient.documentKey) + // latter, then we will never see the resume token and the stream cannot be resumed. + return ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey > + tokenDataFromClient.documentKey) ? ResumeStatus::kSurpassedToken : ResumeStatus::kCheckNextDoc; - - // If we're not running in a sharded context, we don't need to proceed any further. - if (!expCtx->needsMerge && !expCtx->inMongos) { - return defaultResumeStatus; - } - - // If we reach here, we still need to check the possibility that the collection has become - // sharded in the time since the client's resume token was generated. If so, then the client - // token will only have an _id field, while the token from the new pipeline may have additional - // shard key fields. - - // We expect the documentKey to be an object in both the client and stream tokens. If either is - // not, then we cannot compare the embedded _id values in each, and so the stream token does not - // satisfy the client token. - if (tokenDataFromClient.documentKey.getType() != BSONType::Object || - tokenDataFromResumedStream.documentKey.getType() != BSONType::Object) { - return defaultResumeStatus; - } - - auto documentKeyFromResumedStream = tokenDataFromResumedStream.documentKey.getDocument(); - auto documentKeyFromClient = tokenDataFromClient.documentKey.getDocument(); - - // In order for the relaxed comparison to be applicable, the client token must have a single _id - // field, and the resumed stream token must have additional fields beyond _id. - if (!(documentKeyFromClient.computeSize() == 1 && - documentKeyFromResumedStream.computeSize() > 1)) { - return defaultResumeStatus; - } - - // If the resume token's documentKey only contains the _id field while the pipeline's - // documentKey contains additional fields, we require only that the _ids match. - return (!documentKeyFromClient["_id"].missing() && - ValueComparator::kInstance.evaluate(documentKeyFromResumedStream["_id"] == - documentKeyFromClient["_id"]) - ? ResumeStatus::kFoundToken - : defaultResumeStatus); } DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResumability( diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 7456d4370e2..eaf44eb21aa 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -127,11 +127,9 @@ struct MockMongoInterface final : public StubMongoProcessInterface { std::vector<repl::OplogEntry>::const_iterator mockEntriesIt; }; - MockMongoInterface(std::vector<FieldPath> fields, - std::vector<repl::OplogEntry> transactionEntries = {}, + MockMongoInterface(std::vector<repl::OplogEntry> transactionEntries = {}, std::vector<Document> documentsForLookup = {}) - : _fields(std::move(fields)), - _transactionEntries(std::move(transactionEntries)), + : _transactionEntries(std::move(transactionEntries)), _documentsForLookup{std::move(documentsForLookup)} {} // For tests of transactions that involve multiple oplog entries. @@ -173,14 +171,6 @@ struct MockMongoInterface final : public StubMongoProcessInterface { return (it != _documentsForLookup.end() ? *it : boost::optional<Document>{}); } - // For "insert" tests. - std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( - OperationContext*, const NamespaceString&, UUID) const final { - return {_fields, false}; - } - - std::vector<FieldPath> _fields; - // Stores oplog entries associated with a commit operation, including the oplog entries that a // real DocumentSourceChangeStream would not see, because they are marked with a "prepare" or // "partialTxn" flag. When the DocumentSourceChangeStream sees the commit for the transaction, @@ -212,7 +202,6 @@ public: void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc, - std::vector<FieldPath> docKeyFields = {}, const BSONObj& spec = kDefaultSpec, const boost::optional<Document> expectedInvalidate = {}, const std::vector<repl::OplogEntry> transactionEntries = {}, @@ -220,8 +209,8 @@ public: vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.getEntry().toBSON(), spec); auto lastStage = stages.back(); - getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( - docKeyFields, transactionEntries, std::move(documentsForLookup)); + getExpCtx()->mongoProcessInterface = + std::make_unique<MockMongoInterface>(transactionEntries, std::move(documentsForLookup)); auto next = lastStage->getNext(); // Match stage should pass the doc down if expectedDoc is given. @@ -248,8 +237,7 @@ public: list<intrusive_ptr<DocumentSource>> result = DSChangeStream::createFromBson(spec.firstElement(), getExpCtx()); vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result)); - getExpCtx()->mongoProcessInterface = - std::make_unique<MockMongoInterface>(std::vector<FieldPath>{}); + getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(); // This match stage is a DocumentSourceChangeStreamOplogMatch, which we explicitly disallow // from executing as a safety mechanism, since it needs to use the collection-default @@ -708,12 +696,12 @@ TEST_F(ChangeStreamStageTest, ShowMigrationsFailsOnMongos) { } TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { - auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type - nss, // namespace - BSON("_id" << 1 << "x" << 2), // o - testUuid(), // uuid - boost::none, // fromMigrate - boost::none); // o2 + auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + BSON("_id" << 1 << "x" << 2), // o + testUuid(), // uuid + boost::none, // fromMigrate + BSON("x" << 2 << "_id" << 1)); // o2 Document expectedInsert{ {DSChangeStream::kIdField, @@ -724,7 +712,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. }; - checkTransformation(insert, expectedInsert, {{"x"}, {"_id"}}); + checkTransformation(insert, expectedInsert); bool fromMigrate = false; // also check actual "fromMigrate: false" not filtered auto insert2 = makeOplogEntry(insert.getOpType(), // op type insert.getNss(), // namespace @@ -732,16 +720,16 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { insert.getUuid(), // uuid fromMigrate, // fromMigrate insert.getObject2()); // o2 - checkTransformation(insert2, expectedInsert, {{"x"}, {"_id"}}); + checkTransformation(insert2, expectedInsert); } TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) { - auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type - nss, // namespace - BSON("x" << 2 << "_id" << 1), // o - testUuid(), // uuid - boost::none, // fromMigrate - boost::none); // o2 + auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + BSON("x" << 2 << "_id" << 1), // o + testUuid(), // uuid + boost::none, // fromMigrate + BSON("_id" << 1 << "x" << 2)); // o2 Document expectedInsert{ {DSChangeStream::kIdField, @@ -752,7 +740,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first }; - checkTransformation(insert, expectedInsert, {{"_id"}, {"x"}}); + checkTransformation(insert, expectedInsert); } TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) { @@ -761,7 +749,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) { BSON("_id" << 1 << "x" << 2), // o testUuid(), // uuid boost::none, // fromMigrate - boost::none); // o2 + BSON("_id" << 1)); // o2 Document expectedInsert{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))}, @@ -771,7 +759,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, }; - checkTransformation(insert, expectedInsert, {{"_id"}}); + checkTransformation(insert, expectedInsert); } TEST_F(ChangeStreamStageTest, TransformInsertFromMigrate) { @@ -788,12 +776,12 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrate) { TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) { bool fromMigrate = true; - auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type - nss, // namespace - BSON("x" << 2 << "_id" << 1), // o - testUuid(), // uuid - fromMigrate, // fromMigrate - boost::none); // o2 + auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + BSON("x" << 2 << "_id" << 1), // o + testUuid(), // uuid + fromMigrate, // fromMigrate + BSON("_id" << 1 << "x" << 2)); // o2 auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}"); Document expectedInsert{ @@ -805,7 +793,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first }; - checkTransformation(insert, expectedInsert, {{"_id"}, {"x"}}, spec); + checkTransformation(insert, expectedInsert, spec); } TEST_F(ChangeStreamStageTest, TransformUpdateFields) { @@ -1082,7 +1070,7 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) { o, // o testUuid(), // uuid fromMigrate, // fromMigrate - boost::none); // o2 + BSON("_id" << 1)); // o2 auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}"); Document expectedDelete{ @@ -1093,7 +1081,7 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) { {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, }; - checkTransformation(deleteEntry, expectedDelete, {}, spec); + checkTransformation(deleteEntry, expectedDelete, spec); } TEST_F(ChangeStreamStageTest, TransformDrop) { @@ -1113,7 +1101,7 @@ TEST_F(ChangeStreamStageTest, TransformDrop) { {DSChangeStream::kClusterTimeField, kDefaultTs}, }; - checkTransformation(dropColl, expectedDrop, {}, kDefaultSpec, expectedInvalidate); + checkTransformation(dropColl, expectedDrop, kDefaultSpec, expectedInvalidate); } TEST_F(ChangeStreamStageTest, TransformRename) { @@ -1137,7 +1125,7 @@ TEST_F(ChangeStreamStageTest, TransformRename) { {DSChangeStream::kClusterTimeField, kDefaultTs}, }; - checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate); + checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate); } TEST_F(ChangeStreamStageTest, TransformInvalidateFromMigrate) { @@ -1179,7 +1167,7 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) { {DSChangeStream::kClusterTimeField, kDefaultTs}, }; - checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate); + checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate); } TEST_F(ChangeStreamStageTest, MatchFiltersDropDatabaseCommand) { @@ -1228,7 +1216,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) { {DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, }; - checkTransformation(reshardingBegin, expectedReshardingBegin, {}, spec); + checkTransformation(reshardingBegin, expectedReshardingBegin, spec); } TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) { @@ -1258,7 +1246,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) { {DSChangeStream::kClusterTimeField, kDefaultTs}, }; - checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, {}, spec); + checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec); } TEST_F(ChangeStreamStageTest, TransformEmptyApplyOps) { @@ -1380,7 +1368,11 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact Document preparedApplyOps{ {"applyOps", Value{std::vector<Document>{ - D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}}, + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{D{{"_id", 123}}}}, + {"o2", V{D{}}}}, }}}, {"prepare", true}, }; @@ -1434,7 +1426,7 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact {DSChangeStream::kDocumentKeyField, D{}}, }; - checkTransformation(oplogEntry, expectedResult, {}, kDefaultSpec, {}, {preparedTransaction}); + checkTransformation(oplogEntry, expectedResult, kDefaultSpec, {}, {preparedTransaction}); } TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { @@ -1450,11 +1442,13 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, - {"o", V{Document{{"_id", 123}}}}}, + {"o", V{Document{{"_id", 123}}}}, + {"o2", V{Document{{"_id", 123}}}}}, D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, - {"o", V{Document{{"_id", 456}}}}}, + {"o", V{Document{{"_id", 456}}}}, + {"o2", V{Document{{"_id", 456}}}}}, }}}, {"partialTxn", true}, }; @@ -1473,7 +1467,11 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { Document applyOps2{ {"applyOps", V{std::vector<Document>{ - D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}}, + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{D{{"_id", 789}}}}, + {"o2", V{D{{"_id", 789}}}}}, }}}, /* The absence of the "partialTxn" and "prepare" fields indicates that this command commits the transaction. */ @@ -1497,7 +1495,6 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { // Populate the MockTransactionHistoryEditor in reverse chronological order. getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( - std::vector<FieldPath>{}, std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1}); // We should get three documents from the change stream, based on the documents in the two @@ -1515,7 +1512,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { ASSERT_DOCUMENT_EQ(resumeToken, makeResumeToken(applyOpsOpTime2.getTimestamp(), testUuid(), - V{D{}}, + V{D{{"_id", 123}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 0)); @@ -1532,7 +1529,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { ASSERT_DOCUMENT_EQ(resumeToken, makeResumeToken(applyOpsOpTime2.getTimestamp(), testUuid(), - V{D{}}, + V{D{{"_id", 456}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 1)); @@ -1549,7 +1546,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) { ASSERT_DOCUMENT_EQ(resumeToken, makeResumeToken(applyOpsOpTime2.getTimestamp(), testUuid(), - V{D{}}, + V{D{{"_id", 789}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 2)); } @@ -1587,7 +1584,8 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, - {"o", V{Document{{"_id", 123}}}}}, + {"o", V{Document{{"_id", 123}}}}, + {"o2", V{Document{{"_id", 123}}}}}, }}}, {"partialTxn", true}, }; @@ -1624,7 +1622,8 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { V{std::vector<Document>{D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, - {"o", V{Document{{"_id", 456}}}}}}}}, + {"o", V{Document{{"_id", 456}}}}, + {"o2", V{Document{{"_id", 456}}}}}}}}, {"partialTxn", true}, }; @@ -1663,8 +1662,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { // Populate the MockTransactionHistoryEditor in reverse chronological order. getExpCtx()->mongoProcessInterface = - std::make_unique<MockMongoInterface>(std::vector<FieldPath>{}, - std::vector<repl::OplogEntry>{transactionEntry5, + std::make_unique<MockMongoInterface>(std::vector<repl::OplogEntry>{transactionEntry5, transactionEntry4, transactionEntry3, transactionEntry2, @@ -1685,7 +1683,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { ASSERT_DOCUMENT_EQ(resumeToken, makeResumeToken(applyOpsOpTime5.getTimestamp(), testUuid(), - V{D{}}, + V{D{{"_id", 123}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 0)); @@ -1702,7 +1700,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) { ASSERT_DOCUMENT_EQ(resumeToken, makeResumeToken(applyOpsOpTime5.getTimestamp(), testUuid(), - V{D{}}, + V{D{{"_id", 456}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 1)); } @@ -1755,7 +1753,6 @@ TEST_F(ChangeStreamStageTest, TransactionWithOnlyEmptyOplogEntries) { // Populate the MockTransactionHistoryEditor in reverse chronological order. getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( - std::vector<FieldPath>{}, std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1}); // We should get three documents from the change stream, based on the documents in the two @@ -1774,8 +1771,16 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { Document applyOps1{ {"applyOps", V{std::vector<Document>{ - D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}}, - D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}}, + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{D{{"_id", 123}}}}, + {"o2", V{D{{"_id", 123}}}}}, + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{D{{"_id", 456}}}}, + {"o2", V{D{{"_id", 456}}}}}, }}}, {"partialTxn", true}, }; @@ -1794,7 +1799,11 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { Document applyOps2{ {"applyOps", V{std::vector<Document>{ - D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}}, + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{D{{"_id", 789}}}}, + {"o2", V{D{{"_id", 789}}}}}, }}}, {"prepare", true}, }; @@ -1839,7 +1848,6 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { // Populate the MockTransactionHistoryEditor in reverse chronological order. getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( - std::vector<FieldPath>{}, std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1}); // We should get three documents from the change stream, based on the documents in the two @@ -1858,7 +1866,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { resumeToken, makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), - V{D{}}, + V{D{{"_id", 123}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 0)); @@ -1876,7 +1884,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { resumeToken, makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), - V{D{}}, + V{D{{"_id", 456}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 1)); @@ -1894,7 +1902,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) { resumeToken, makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), - V{D{}}, + V{D{{"_id", 789}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 2)); @@ -1912,8 +1920,16 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) { Document applyOps1{ {"applyOps", V{std::vector<Document>{ - D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}}, - D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}}, + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{D{{"_id", 123}}}}, + {"o2", V{D{{"_id", 123}}}}}, + D{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", V{D{{"_id", 456}}}}, + {"o2", V{D{{"_id", 456}}}}}, }}}, {"partialTxn", true}, }; @@ -1975,7 +1991,6 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) { // Populate the MockTransactionHistoryEditor in reverse chronological order. getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( - std::vector<FieldPath>{}, std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1}); // We should get two documents from the change stream, based on the documents in the non-empty @@ -1994,7 +2009,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) { resumeToken, makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), - V{D{}}, + V{D{{"_id", 123}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 0)); @@ -2012,7 +2027,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) { resumeToken, makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand. testUuid(), - V{D{}}, + V{D{{"_id", 456}}}, ResumeTokenData::FromInvalidate::kNotFromInvalidate, 1)); @@ -2374,7 +2389,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut) ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>); } -TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { +TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeTokenWhenNoO2FieldInOplog) { const Timestamp ts(3, 45); const long long term = 4; const auto opTime = repl::OpTime(ts, term); @@ -2387,8 +2402,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { }); - BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); - auto resumeToken = makeResumeToken(ts, uuid, o2); + BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, docKey); BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type @@ -2410,17 +2425,55 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { // Although the chunk manager and sharding catalog are not aware of the shard key in this test, // the expectation is for the $changeStream stage to infer the shard key from the resume token. checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); // Verify the same behavior with resuming using 'startAfter'. checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("startAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKeyCache) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + std::shared_ptr<Collection> collection = + std::make_shared<CollectionMock>(TenantNamespace(boost::none, nss)); + CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { + catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + }); + + + BSONObj docKey = BSON("_id" << 1); + auto resumeToken = makeResumeToken(ts, uuid, docKey); + + BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); + BSONObj o2 = BSON("_id" << 2 << "shardKey" << 3); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + o2, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, + }; + // When o2 is present in the oplog entry, we should use its value for the document key, even if + // the resume token doesn't contain shard key. + checkTransformation( + insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + + // Verify the same behavior with resuming using 'startAfter'. + checkTransformation( + insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken))); } TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { @@ -2435,8 +2488,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); }); - BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); - auto resumeToken = makeResumeToken(ts, uuid, o2); + BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, docKey); // Note that the 'o' field in the oplog entry does not contain the shard key field. BSONObj insertDoc = BSON("_id" << 2); @@ -2457,17 +2510,11 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, }; checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); // Verify the same behavior with resuming using 'startAfter'. checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("startAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken))); } TEST_F(ChangeStreamStageTest, ResumeAfterFailsIfResumeTokenDoesNotContainUUID) { @@ -2552,17 +2599,16 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) { } TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) { - auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type - nss, // namespace - BSON("x" << 2 << "_id" << 1), // o - testUuid(), // uuid - boost::none, // fromMigrate - boost::none); // o2 + auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + BSON("x" << 2 << "_id" << 1), // o + testUuid(), // uuid + boost::none, // fromMigrate + BSON("x" << 2 << "_id" << 1)); // o2 auto stages = makeStages(insert.getEntry().toBSON(), kDefaultSpec); - getExpCtx()->mongoProcessInterface = - std::make_unique<MockMongoInterface>(std::vector<FieldPath>{{"x"}, {"_id"}}); + getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(); getExpCtx()->needsMerge = false; @@ -2584,7 +2630,12 @@ public: }; TEST_F(ChangeStreamStageDBTest, TransformInsert) { - auto insert = makeOplogEntry(OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 2)); + auto insert = makeOplogEntry(OpTypeEnum::kInsert, + nss, + BSON("_id" << 1 << "x" << 2), + testUuid(), + boost::none, + BSON("x" << 2 << "_id" << 1)); Document expectedInsert{ {DSChangeStream::kIdField, @@ -2595,13 +2646,17 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) { {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. }; - checkTransformation(insert, expectedInsert, {{"x"}, {"_id"}}); + checkTransformation(insert, expectedInsert); } TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) { NamespaceString otherNss("unittests.other_collection."); - auto insertOtherColl = - makeOplogEntry(OpTypeEnum::kInsert, otherNss, BSON("_id" << 1 << "x" << 2)); + auto insertOtherColl = makeOplogEntry(OpTypeEnum::kInsert, + otherNss, + BSON("_id" << 1 << "x" << 2), + testUuid(), + boost::none, + BSON("x" << 2 << "_id" << 1)); // Insert on another collection in the same database. Document expectedInsert{ @@ -2613,7 +2668,7 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) { {DSChangeStream::kNamespaceField, D{{"db", otherNss.db()}, {"coll", otherNss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. }; - checkTransformation(insertOtherColl, expectedInsert, {{"x"}, {"_id"}}); + checkTransformation(insertOtherColl, expectedInsert); } TEST_F(ChangeStreamStageDBTest, MatchFiltersChangesOnOtherDatabases) { @@ -2661,7 +2716,8 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy }; for (auto& ns : allowedNamespaces) { - auto insert = makeOplogEntry(OpTypeEnum::kInsert, ns, BSON("_id" << 1)); + auto insert = makeOplogEntry( + OpTypeEnum::kInsert, ns, BSON("_id" << 1), testUuid(), boost::none, BSON("_id" << 1)); Document expectedInsert{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, @@ -2670,7 +2726,7 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy {DSChangeStream::kNamespaceField, D{{"db", ns.db()}, {"coll", ns.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, }; - checkTransformation(insert, expectedInsert, {{"_id"}}); + checkTransformation(insert, expectedInsert); } } @@ -2798,7 +2854,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) { {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, }; - checkTransformation(deleteEntry, expectedDelete, {}, spec); + checkTransformation(deleteEntry, expectedDelete, spec); } TEST_F(ChangeStreamStageDBTest, TransformDrop) { @@ -2846,7 +2902,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { {DSChangeStream::kClusterTimeField, kDefaultTs}, }; - checkTransformation(dropDB, expectedDropDatabase, {}, kDefaultSpec, expectedInvalidate); + checkTransformation(dropDB, expectedDropDatabase, kDefaultSpec, expectedInvalidate); } TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { @@ -2900,7 +2956,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { {DSChangeStream::kDocumentKeyField, documentKey}, }; checkTransformation( - deleteEntry, expectedDeleteNoPreImage, {}, spec, boost::none, {}, documentsForLookup); + deleteEntry, expectedDeleteNoPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" @@ -2914,13 +2970,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj}, }; checkTransformation( - deleteEntry, expectedDeleteWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + deleteEntry, expectedDeleteWithPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); checkTransformation( - deleteEntry, expectedDeleteWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + deleteEntry, expectedDeleteWithPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image is available, the // output 'fullDocumentBeforeChange' field is explicitly set to 'null'. @@ -2929,13 +2985,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { MutableDocument expectedDeleteWithNullPreImage(expectedDeleteNoPreImage); expectedDeleteWithNullPreImage.addField(DSChangeStream::kFullDocumentBeforeChangeField, Value(BSONNULL)); - checkTransformation(deleteEntry, expectedDeleteWithNullPreImage.freeze(), {}, spec); + checkTransformation(deleteEntry, expectedDeleteWithNullPreImage.freeze(), spec); // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we // throw NoMatchingDocument. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); - ASSERT_THROWS_CODE(checkTransformation(deleteEntry, boost::none, {}, spec), + ASSERT_THROWS_CODE(checkTransformation(deleteEntry, boost::none, spec), AssertionException, ErrorCodes::NoMatchingDocument); } @@ -2996,7 +3052,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { }, }; checkTransformation( - updateEntry, expectedUpdateNoPreImage, {}, spec, boost::none, {}, documentsForLookup); + updateEntry, expectedUpdateNoPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" @@ -3014,13 +3070,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj}, }; checkTransformation( - updateEntry, expectedUpdateWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + updateEntry, expectedUpdateWithPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); checkTransformation( - updateEntry, expectedUpdateWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + updateEntry, expectedUpdateWithPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image is available, the // output 'fullDocumentBeforeChange' field is explicitly set to 'null'. @@ -3029,13 +3085,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { MutableDocument expectedUpdateWithNullPreImage(expectedUpdateNoPreImage); expectedUpdateWithNullPreImage.addField(DSChangeStream::kFullDocumentBeforeChangeField, Value(BSONNULL)); - checkTransformation(updateEntry, expectedUpdateWithNullPreImage.freeze(), {}, spec); + checkTransformation(updateEntry, expectedUpdateWithNullPreImage.freeze(), spec); // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we // throw NoMatchingDocument. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); - ASSERT_THROWS_CODE(checkTransformation(updateEntry, boost::none, {}, spec), + ASSERT_THROWS_CODE(checkTransformation(updateEntry, boost::none, spec), AssertionException, ErrorCodes::NoMatchingDocument); } @@ -3093,7 +3149,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { {DSChangeStream::kDocumentKeyField, documentKey}, }; checkTransformation( - replaceEntry, expectedReplaceNoPreImage, {}, spec, boost::none, {}, documentsForLookup); + replaceEntry, expectedReplaceNoPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" @@ -3108,13 +3164,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { {DSChangeStream::kFullDocumentBeforeChangeField, preImageObj}, }; checkTransformation( - replaceEntry, expectedReplaceWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + replaceEntry, expectedReplaceWithPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "required"}, we see the pre-image. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); checkTransformation( - replaceEntry, expectedReplaceWithPreImage, {}, spec, boost::none, {}, documentsForLookup); + replaceEntry, expectedReplaceWithPreImage, spec, boost::none, {}, documentsForLookup); // When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image is available, the // output 'fullDocumentBeforeChange' field is explicitly set to 'null'. @@ -3123,13 +3179,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { MutableDocument expectedReplaceWithNullPreImage(expectedReplaceNoPreImage); expectedReplaceWithNullPreImage.addField(DSChangeStream::kFullDocumentBeforeChangeField, Value(BSONNULL)); - checkTransformation(replaceEntry, expectedReplaceWithNullPreImage.freeze(), {}, spec); + checkTransformation(replaceEntry, expectedReplaceWithNullPreImage.freeze(), spec); // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we // throw NoMatchingDocument. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); - ASSERT_THROWS_CODE(checkTransformation(replaceEntry, boost::none, {}, spec), + ASSERT_THROWS_CODE(checkTransformation(replaceEntry, boost::none, spec), AssertionException, ErrorCodes::NoMatchingDocument); } @@ -3195,7 +3251,8 @@ TEST_F(ChangeStreamStageDBTest, MatchFiltersNoOp) { checkTransformation(noOp, boost::none); } -TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) { +TEST_F(ChangeStreamStageDBTest, + DocumentKeyShouldIncludeShardKeyFromResumeTokenWhenNoO2FieldInOplog) { const Timestamp ts(3, 45); const long long term = 4; const auto opTime = repl::OpTime(ts, term); @@ -3207,8 +3264,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); }); - BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); - auto resumeToken = makeResumeToken(ts, uuid, o2); + BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, docKey); BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type @@ -3227,11 +3284,49 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, }; + // Although the chunk manager and sharding catalog are not aware of the shard key in this test, + // the expectation is for the $changeStream stage to infer the shard key from the resume token. + checkTransformation( + insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); +} + +TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKeyCache) { + const Timestamp ts(3, 45); + const long long term = 4; + const auto opTime = repl::OpTime(ts, term); + const auto uuid = testUuid(); + + std::shared_ptr<Collection> collection = + std::make_shared<CollectionMock>(TenantNamespace(boost::none, nss)); + CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) { + catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); + }); + + BSONObj docKey = BSON("_id" << 1); + auto resumeToken = makeResumeToken(ts, uuid, docKey); + + BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3); + BSONObj o2 = BSON("_id" << 2 << "shardKey" << 3); + auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type + nss, // namespace + insertDoc, // o + uuid, // uuid + boost::none, // fromMigrate + o2, // o2 + opTime); // opTime + + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}}, + }; + // When o2 is present in the oplog entry, we should use its value for the document key, even if + // the resume token doesn't contain shard key. checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); } TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) { @@ -3246,8 +3341,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPr catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection)); }); - BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2); - auto resumeToken = makeResumeToken(ts, uuid, o2); + BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2); + auto resumeToken = makeResumeToken(ts, uuid, docKey); // Note that the 'o' field in the oplog entry does not contain the shard key field. BSONObj insertDoc = BSON("_id" << 2); @@ -3268,10 +3363,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPr {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, }; checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); } TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) { @@ -3309,10 +3401,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, }; checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); } TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromInvalidateShouldFail) { @@ -3354,7 +3443,8 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) { kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kNotFromInvalidate); BSONObj insertDoc = BSON("_id" << 2); - auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc); + auto insertEntry = + makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc, testUuid(), boost::none, insertDoc); Document expectedInsert{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)}, @@ -3365,10 +3455,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) { {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, }; checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken))); } @@ -3385,7 +3472,8 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai auto resumeToken = makeResumeToken(kDefaultTs); BSONObj insertDoc = BSON("_id" << 2); - auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc); + auto insertEntry = + makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc, uuid, boost::none, insertDoc); Document expectedInsert{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)}, @@ -3396,10 +3484,7 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai {DSChangeStream::kDocumentKeyField, D{{"_id", 2}}}, }; checkTransformation( - insertEntry, - expectedInsert, - {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response. - BSON("$changeStream" << BSON("startAfter" << resumeToken))); + insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken))); } TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleMatch) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 0e43a47ef0d..148e0c9a385 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -35,6 +35,7 @@ #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/pipeline/change_stream_document_diff_parser.h" +#include "mongo/db/pipeline/change_stream_helpers_legacy.h" #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" @@ -112,22 +113,7 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( // If the change stream spec includes a resumeToken with a shard key, populate the document key // cache with the field paths. - if (!tokenData.documentKey.missing() && tokenData.uuid) { - std::vector<FieldPath> docKeyFields; - auto docKey = tokenData.documentKey.getDocument(); - - auto iter = docKey.fieldIterator(); - while (iter.more()) { - auto fieldPair = iter.next(); - docKeyFields.push_back(fieldPair.first); - } - - // If the document key from the resume token has more than one field, that means it - // includes the shard key and thus should never change. - const bool isFinal = docKeyFields.size() > 1; - - _documentKeyCache[tokenData.uuid.get()] = DocumentKeyCacheEntry({docKeyFields, isFinal}); - } + _documentKeyCache = change_stream_legacy::buildDocumentKeyCache(tokenData); } StageConstraints DocumentSourceChangeStreamTransform::constraints( @@ -196,31 +182,11 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document Value ns = input[repl::OplogEntry::kNssFieldName]; checkValueType(ns, repl::OplogEntry::kNssFieldName, BSONType::String); Value uuid = input[repl::OplogEntry::kUuidFieldName]; - std::vector<FieldPath> documentKeyFields; // Deal with CRUD operations and commands. auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op); NamespaceString nss(ns.getString()); - // Ignore commands in the oplog when looking up the document key fields since a command implies - // that the change stream is about to be invalidated (e.g. collection drop). - if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) { - checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData); - // We need to retrieve the document key fields if our cache does not have an entry for this - // UUID or if the cache entry is not definitively final, indicating that the collection was - // unsharded when the entry was last populated. - auto it = _documentKeyCache.find(uuid.getUuid()); - if (it == _documentKeyCache.end() || !it->second.isFinal) { - auto docKeyFields = - pExpCtx->mongoProcessInterface->collectDocumentKeyFieldsForHostedCollection( - pExpCtx->opCtx, nss, uuid.getUuid()); - if (it == _documentKeyCache.end() || docKeyFields.second) { - _documentKeyCache[uuid.getUuid()] = DocumentKeyCacheEntry(docKeyFields); - } - } - - documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields; - } Value id = input.getNestedField("o._id"); // Non-replace updates have the _id in field "o2". StringData operationType; @@ -232,8 +198,23 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document case repl::OpTypeEnum::kInsert: { operationType = DocumentSourceChangeStream::kInsertOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; - documentKey = Value(document_path_support::extractPathsFromDoc( - fullDocument.getDocument(), documentKeyFields)); + documentKey = input[repl::OplogEntry::kObject2FieldName]; + // For oplog entries written on an older version of the server, the documentKey may be + // missing. + if (documentKey.missing()) { + // If we are resuming from an 'insert' oplog entry that does not have a documentKey, + // it may have been read on an older version of the server that populated the + // documentKey fields from the sharding catalog. We populate the fields we observed + // in the resume token in order to retain consistent event ordering around the + // resume point during upgrade. Otherwise, we default to _id as the only document + // key field. + if (_documentKeyCache && _documentKeyCache->first == uuid.getUuid()) { + documentKey = Value(document_path_support::extractPathsFromDoc( + fullDocument.getDocument(), _documentKeyCache->second)); + } else { + documentKey = Value(Document{{"_id", id}}); + } + } break; } case repl::OpTypeEnum::kDelete: { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 51d3fdc793c..d9dcede8598 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -73,22 +73,6 @@ private: DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec); - struct DocumentKeyCacheEntry { - DocumentKeyCacheEntry() = default; - - DocumentKeyCacheEntry(std::pair<std::vector<FieldPath>, bool> documentKeyFieldsIn) - : documentKeyFields(documentKeyFieldsIn.first), isFinal(documentKeyFieldsIn.second){}; - // Fields of the document key, in order, including "_id" and the shard key if the - // collection is sharded. Empty until the first oplog entry with a uuid is encountered. - // Needed for transforming 'insert' oplog entries. - std::vector<FieldPath> documentKeyFields; - - // Set to true if the document key fields for this entry are definitively known and will - // not change. This implies that either the collection has become sharded or has been - // dropped. - bool isFinal; - }; - /** * Helper used for determining what resume token to return. */ @@ -96,8 +80,8 @@ private: DocumentSourceChangeStreamSpec _changeStreamSpec; - // Map of collection UUID to document key fields. - std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache; + // Records the documentKey fields from the client's resume token, if present. + boost::optional<std::pair<UUID, std::vector<FieldPath>>> _documentKeyCache; // Set to true if this transformation stage can be run on the collectionless namespace. bool _isIndependentOfAnyCollection; diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp index 66b0b6c62c1..67f8007fe2d 100644 --- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -501,32 +501,6 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) { ASSERT_TRUE(checkResumeToken->getNext().isEOF()); } -TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdMatchesFirstDoc) { - // Verify that a resume token whose documentKey only contains _id can be used to resume a stream - // on a sharded collection as long as its _id matches the first document. We set 'inMongos' - // since this behaviour is only applicable when DSCSEnsureResumeTokenPresent is running on - // mongoS. - Timestamp resumeTimestamp(100, 1); - getExpCtx()->inMongos = true; - - auto checkResumeToken = - createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}}); - - Timestamp doc1Timestamp(100, 1); - addOplogEntryOnTestNS(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}}); - Timestamp doc2Timestamp(100, 2); - Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}}; - addOplogEntryOnTestNS(doc2Timestamp, doc2DocKey); - - // We should skip doc1 since it satisfies the resume token, and retrieve doc2. - const auto firstDocAfterResume = checkResumeToken->getNext(); - const auto tokenFromFirstDocAfterResume = - ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData(); - - ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, doc2Timestamp); - ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), doc2DocKey); -} - TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoesNotMatchFirstDoc) { Timestamp resumeTimestamp(100, 1); getExpCtx()->inMongos = true; diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 937e3c01df7..fba8fcfea3d 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -330,16 +330,6 @@ public: virtual std::string getHostAndPort(OperationContext* opCtx) const = 0; /** - * Returns the fields of the document key (in order) for the collection corresponding to 'uuid', - * including the shard key and _id. If _id is not in the shard key, it is added last. If the - * collection is not sharded or no longer exists, returns only _id. Also returns a boolean that - * indicates whether the returned fields of the document key are final and will never change for - * the given collection, either because the collection was dropped or has become sharded. - */ - virtual std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( - OperationContext* opCtx, const NamespaceString&, UUID) const = 0; - - /** * Returns the fields of the document key (in order) for the collection 'nss', according to the * CatalogCache. The document key fields are the shard key (if sharded) and the _id (if not * already in the shard key). If _id is not in the shard key, it is added last. If the diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index c10c9ceef80..e08a8169ed7 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -165,11 +165,6 @@ public: MONGO_UNREACHABLE; } - std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( - OperationContext* opCtx, const NamespaceString&, UUID) const final { - MONGO_UNREACHABLE; - } - /** * The following methods only make sense for data-bearing nodes and should never be called on * a mongos. diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index b54f190ae0b..682c0075340 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -59,12 +59,6 @@ std::list<BSONObj> NonShardServerProcessInterface::getIndexSpecs(OperationContex opCtx, ns, includeBuildUUIDs ? ListIndexesInclude::BuildUUID : ListIndexesInclude::Nothing); } -std::pair<std::vector<FieldPath>, bool> -NonShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection( - OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const { - return {{"_id"}, false}; // Nothing is sharded. -} - std::vector<FieldPath> NonShardServerProcessInterface::collectDocumentKeyFieldsActingAsRouter( OperationContext* opCtx, const NamespaceString& nss) const { return {"_id"}; // Nothing is sharded. diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index 663fa4531bf..b2b4f731f4e 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -75,8 +75,6 @@ public: ChunkVersion targetCollectionVersion) const override { uasserted(51020, "unexpected request to consult sharding catalog on non-shardsvr"); } - std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( - OperationContext* opCtx, const NamespaceString&, UUID) const final; std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const final; diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index 6597e7791af..00d0ae8c2de 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -89,33 +89,6 @@ void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow( foundVersion.isSameCollection(targetCollectionVersion)); } -std::pair<std::vector<FieldPath>, bool> -ShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(OperationContext* opCtx, - const NamespaceString& nss, - UUID uuid) const { - invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); - - auto* const catalogCache = Grid::get(opCtx)->catalogCache(); - auto swCM = catalogCache->getCollectionRoutingInfo(opCtx, nss); - if (swCM.isOK()) { - const auto& cm = swCM.getValue(); - if (cm.isSharded() && cm.uuidMatches(uuid)) { - // Unpack the shard key. Collection is now sharded so the document key fields will never - // change, mark as final. - return {_shardKeyToDocumentKeyFields(cm.getShardKeyPattern().getKeyPatternFields()), - true}; - } - } else if (swCM != ErrorCodes::NamespaceNotFound) { - uassertStatusOK(std::move(swCM)); - } - - // An unsharded collection can still become sharded so is not final. If the uuid doesn't match - // the one stored in the ScopedCollectionDescription, this implies that the collection has been - // dropped and recreated as sharded. We don't know what the old document key fields might have - // been in this case so we return just _id. - return {{"_id"}, false}; -} - boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index dc4b26a971a..d16ae72c444 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -53,9 +53,6 @@ public: const NamespaceString& nss, ChunkVersion targetCollectionVersion) const final; - std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( - OperationContext* opCtx, const NamespaceString&, UUID) const final; - std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const final { // We don't expect anyone to use this method on the shard itself (yet). This is currently diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index 68e85245cd9..5fe7d78a26b 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -184,11 +184,6 @@ public: MONGO_UNREACHABLE; } - std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection( - OperationContext*, const NamespaceString&, UUID) const override { - MONGO_UNREACHABLE; - } - std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const override { MONGO_UNREACHABLE; |