summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRui Liu <rui.liu@mongodb.com>2022-01-21 18:38:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-21 19:25:45 +0000
commitacd1c72c25fdeff22ad6c0fa8c8ac735d8cc9da8 (patch)
treec4ee3359777f276918b7d064529c86a92159a96f
parentfe264a7777c9199304f7e28c25a5b5dd2fba47e0 (diff)
downloadmongo-acd1c72c25fdeff22ad6c0fa8c8ac735d8cc9da8.tar.gz
SERVER-61892 Read document key from oplog instead of cache
-rw-r--r--jstests/multiVersion/change_stream_resume.js95
-rw-r--r--jstests/sharding/change_stream_resume_shard_key_change.js87
-rw-r--r--jstests/sharding/change_streams_unsharded_becomes_sharded.js8
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.cpp17
-rw-r--r--src/mongo/db/pipeline/change_stream_helpers_legacy.h8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp60
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp397
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp57
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h20
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp26
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h10
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h5
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp27
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h3
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h5
17 files changed, 478 insertions, 355 deletions
diff --git a/jstests/multiVersion/change_stream_resume.js b/jstests/multiVersion/change_stream_resume.js
new file mode 100644
index 00000000000..be160250a7e
--- /dev/null
+++ b/jstests/multiVersion/change_stream_resume.js
@@ -0,0 +1,95 @@
+// Verify that we can successfully resume a change stream using a token generated on an older
+// version of the server from an insert oplog entry that does not have the documentKey embedded in
+// its "o2" field. Also verify that we can resume on a downgraded cluster using a token generated on
+// the latest version of the server.
+//
+// @tags: [uses_change_streams, requires_replication]
+
+(function() {
+"use strict";
+
+load("jstests/multiVersion/libs/multi_cluster.js"); // For upgradeCluster.
+
+function checkNextDoc({changeStream, doc, docKeyFields}) {
+ assert.soon(() => changeStream.hasNext());
+ const change = changeStream.next();
+ assert.docEq(change.fullDocument, doc);
+ assert.eq(Object.keys(change.documentKey), docKeyFields);
+ return changeStream.getResumeToken();
+}
+
+function runTest(oldVersion) {
+ const dbName = "test";
+ const collName = "change_streams_resume";
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 2,
+ binVersion: oldVersion,
+ setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}
+ },
+ other: {mongosOptions: {binVersion: oldVersion}}
+ });
+ st.shardColl(collName,
+ {shard: 1} /* Shard key */,
+ {shard: 1} /* Split at */,
+ {shard: 1} /* Move the chunk containing {shard: 1} to its own shard */,
+ dbName,
+ true /* Wait until documents orphaned by the move get deleted */);
+
+ // Establish a resume token before anything actually happens in the test.
+ let coll = st.s.getDB(dbName).getCollection(collName);
+ let changeStream = coll.watch();
+ const startOfTestResumeToken = changeStream.getResumeToken();
+
+ const docs = [{_id: 0, shard: 0}, {_id: 1, shard: 1}];
+ assert.commandWorked(coll.insert(docs));
+
+ // Verify that we see the first inserted document, and obtain its resume token.
+ const resumeTokenWithShardKey =
+ checkNextDoc({changeStream, doc: docs[0], docKeyFields: ["shard", "_id"]});
+
+ // Upgrade the cluster to the latest.
+ st.upgradeCluster(
+ "latest",
+ {upgradeShards: true, upgradeConfigs: true, upgradeMongos: true, waitUntilStable: true});
+ coll = st.s.getDB(dbName).getCollection(collName);
+
+ // Confirm that we can use the resume token with shard keys generated on the old version to
+ // resume the new stream. This is true even though the documentKey is not embedded in the oplog,
+ // and this would usually result in a resume token without any shard key fields.
+ checkNextDoc({
+ changeStream: coll.watch([], {resumeAfter: resumeTokenWithShardKey}),
+ doc: docs[1],
+ docKeyFields: ["shard", "_id"]
+ });
+
+ // Now start a new stream on "latest" from the start-of-test resume point. Confirm that we see
+ // the first insert, and that this time the documentKey does not have any shard key fields.
+ const resumeTokenNoShardKey = checkNextDoc({
+ changeStream: coll.watch([], {resumeAfter: startOfTestResumeToken}),
+ doc: docs[0],
+ docKeyFields: ["_id"]
+ });
+
+ // Downgrade the cluster again.
+ st.upgradeCluster(
+ oldVersion,
+ {upgradeShards: true, upgradeConfigs: true, upgradeMongos: true, waitUntilStable: true});
+ coll = st.s.getDB(dbName).getCollection(collName);
+
+ // Confirm that we can resume the stream from the resume token generated on "latest",
+ // even though the token only contains _id while the resumed stream will produce a token
+ // that includes the shard key fields.
+ checkNextDoc({
+ changeStream: coll.watch([], {resumeAfter: resumeTokenNoShardKey}),
+ doc: docs[1],
+ docKeyFields: ["shard", "_id"]
+ });
+
+ st.stop();
+}
+
+runTest('last-continuous');
+runTest('last-lts');
+}());
diff --git a/jstests/sharding/change_stream_resume_shard_key_change.js b/jstests/sharding/change_stream_resume_shard_key_change.js
new file mode 100644
index 00000000000..945af3e2337
--- /dev/null
+++ b/jstests/sharding/change_stream_resume_shard_key_change.js
@@ -0,0 +1,87 @@
+// Tests resuming change streams when shard key is changed.
+//
+// @tags: [
+// requires_majority_read_concern,
+// uses_change_streams,
+// requires_fcv_53,
+// ]
+
+(function() {
+"use strict";
+
+const dbName = 'testDB';
+const collName = 'testColl';
+
+const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Use the noop writer with a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
+ }
+});
+
+const db = st.s.getDB(dbName);
+const coll = db.getCollection(collName);
+const changeStream = coll.watch();
+
+const docs = [
+ {_id: 0, shardField1: 0, shardField2: 0},
+ {_id: 1, shardField1: 1, shardField2: 0},
+ {_id: 2, shardField1: 1, shardField2: 1},
+ {_id: 3, shardField1: 0, shardField2: 1},
+];
+const docKeys = [
+ {_id: 0},
+ {shardField1: 1, _id: 1},
+ {shardField1: 1, shardField2: 1, _id: 2},
+ {shardField1: 0, shardField2: 1, _id: 3},
+];
+
+// Document inserted in unsharded collection.
+assert.commandWorked(coll.insert(docs[0]));
+
+// Document inserted in sharded collection.
+assert.commandWorked(coll.createIndex({shardField1: 1}));
+st.shardColl(collName,
+ {shardField1: 1} /* Shard key */,
+ {shardField1: 1} /* Split at */,
+ {shardField1: 1} /* Move the chunk containing {shardField1: 1} to its own shard */,
+ dbName,
+ true /* Wait until documents orphaned by the move get deleted */);
+assert.commandWorked(coll.insert(docs[1]));
+
+// Document inserted in shard key refined collection.
+assert.commandWorked(coll.createIndex({shardField1: 1, shardField2: 1}));
+assert.commandWorked(db.adminCommand({
+ refineCollectionShardKey: `${dbName}.${collName}`,
+ key: {shardField1: 1, shardField2: 1},
+}));
+assert.commandWorked(coll.insert(docs[2]));
+
+// Insert one more document as a final sentinel, to ensure that there is always at least one visible
+// event following the resume points we wish to test.
+assert.commandWorked(coll.insert(docs[3]));
+
+const verifyChanges = (changeStream, startingIndex) => {
+ const changes = [];
+ assert.soon(() => {
+ while (changeStream.hasNext()) {
+ changes.push(changeStream.next());
+ }
+ return changes.length === docs.length - startingIndex;
+ });
+ assert.docEq(changes.map(x => x.fullDocument), docs.slice(startingIndex));
+ assert.docEq(changes.map(x => x.documentKey), docKeys.slice(startingIndex));
+ return changes;
+};
+
+// Verify that we can resume from each change point.
+const changes = verifyChanges(changeStream, 0);
+changes.forEach((change, i) => {
+ verifyChanges(coll.watch([], {resumeAfter: change._id}), i + 1);
+});
+
+st.stop();
+})();
diff --git a/jstests/sharding/change_streams_unsharded_becomes_sharded.js b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
index b44fb6c1f88..9998ef9bcda 100644
--- a/jstests/sharding/change_streams_unsharded_becomes_sharded.js
+++ b/jstests/sharding/change_streams_unsharded_becomes_sharded.js
@@ -155,19 +155,19 @@ function testUnshardedBecomesSharded(collToWatch) {
assert.commandWorked(mongosColl.insert({_id: -1, z: -1}));
assert.commandWorked(mongosColl.insert({_id: 1, z: 1}));
- // Verify that the change stream picks up the inserts, however the shard key is missing
- // since the collection has since been dropped and recreated.
+ // Verify that the change stream picks up the inserts. The shard keys are present since they are
+ // recorded in the oplog.
cst.assertNextChangesEqual({
cursor: cursor,
expectedChanges: [
{
- documentKey: {_id: -2},
+ documentKey: {x: -2, _id: -2},
fullDocument: {_id: -2, x: -2},
ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
operationType: "insert",
},
{
- documentKey: {_id: -3},
+ documentKey: {x: -3, _id: -3},
fullDocument: {_id: -3, x: -3},
ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
operationType: "insert",
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
index 861f86f0992..24f06821a9b 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.cpp
@@ -122,5 +122,22 @@ boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionCo
return Document{opLogEntry.getObject().getOwned()};
}
+boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache(
+ const ResumeTokenData& tokenData) {
+ if (!tokenData.documentKey.missing() && tokenData.uuid) {
+ std::vector<FieldPath> docKeyFields;
+ auto docKey = tokenData.documentKey.getDocument();
+
+ auto iter = docKey.fieldIterator();
+ while (iter.more()) {
+ auto fieldPair = iter.next();
+ docKeyFields.push_back(fieldPair.first);
+ }
+
+ return std::make_pair(tokenData.uuid.get(), docKeyFields);
+ }
+ return {};
+}
+
} // namespace change_stream_legacy
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_helpers_legacy.h b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
index 38c86bd120a..0ac51bab7ac 100644
--- a/src/mongo/db/pipeline/change_stream_helpers_legacy.h
+++ b/src/mongo/db/pipeline/change_stream_helpers_legacy.h
@@ -47,4 +47,12 @@ std::list<boost::intrusive_ptr<DocumentSource>> buildPipeline(
boost::optional<Document> legacyLookupPreImage(boost::intrusive_ptr<ExpressionContext> pExpCtx,
const Document& preImageId);
+/**
+ * Builds document key cache from the resume token. The cache will be used when the insert oplog
+ * entry does not contain the documentKey. This can happen when reading an oplog entry written by an
+ * older version of the server.
+ */
+boost::optional<std::pair<UUID, std::vector<FieldPath>>> buildDocumentKeyCache(
+ const ResumeTokenData& data);
+
} // namespace mongo::change_stream_legacy
diff --git a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
index b68edc81899..abac48c6ae8 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_check_resumability.cpp
@@ -47,19 +47,8 @@ REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalChangeStreamCheckResumability,
// Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
-// and ResumeToken::kCannotResume if it is more recent than the client's resume token (indicating
-// that we will never see the token). If the resume token's documentKey contains only the _id field
-// while the pipeline documentKey contains additional fields, then the collection has become sharded
-// since the resume token was generated. In that case, we relax the requirements such that only the
-// timestamp, version, txnOpIndex, UUID and documentKey._id need match. This remains correct, since
-// the only circumstances under which the resume token omits the shard key is if it was generated
-// either (1) before the collection was sharded, (2) after the collection was sharded but before the
-// primary shard became aware of that fact, implying that it was before the first chunk moved off
-// the shard, or (3) by a malicious client who has constructed their own resume token. In the first
-// two cases, we can be guaranteed that the _id is unique and the stream can therefore be resumed
-// seamlessly; in the third case, the worst that can happen is that some entries are missed or
-// duplicated. Note that the simple collation is used to compare the resume tokens, and that we
-// purposefully avoid the user's requested collation if present.
+// and ResumeToken::kSurpassedToken if it is more recent than the client's resume token (indicating
+// that we will never see the token).
DocumentSourceChangeStreamCheckResumability::ResumeStatus
DocumentSourceChangeStreamCheckResumability::compareAgainstClientResumeToken(
const intrusive_ptr<ExpressionContext>& expCtx,
@@ -127,50 +116,11 @@ DocumentSourceChangeStreamCheckResumability::compareAgainstClientResumeToken(
// At this point, we know that the tokens differ only by documentKey. The status we return will
// depend on whether the stream token is logically before or after the client token. If the
- // latter, then we will never see the resume token and the stream cannot be resumed. However,
- // before we can return this value, we need to check the possibility that the resumed stream is
- // on a sharded collection and the client token is from before the collection was sharded.
- const auto defaultResumeStatus =
- ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey >
- tokenDataFromClient.documentKey)
+ // latter, then we will never see the resume token and the stream cannot be resumed.
+ return ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey >
+ tokenDataFromClient.documentKey)
? ResumeStatus::kSurpassedToken
: ResumeStatus::kCheckNextDoc;
-
- // If we're not running in a sharded context, we don't need to proceed any further.
- if (!expCtx->needsMerge && !expCtx->inMongos) {
- return defaultResumeStatus;
- }
-
- // If we reach here, we still need to check the possibility that the collection has become
- // sharded in the time since the client's resume token was generated. If so, then the client
- // token will only have an _id field, while the token from the new pipeline may have additional
- // shard key fields.
-
- // We expect the documentKey to be an object in both the client and stream tokens. If either is
- // not, then we cannot compare the embedded _id values in each, and so the stream token does not
- // satisfy the client token.
- if (tokenDataFromClient.documentKey.getType() != BSONType::Object ||
- tokenDataFromResumedStream.documentKey.getType() != BSONType::Object) {
- return defaultResumeStatus;
- }
-
- auto documentKeyFromResumedStream = tokenDataFromResumedStream.documentKey.getDocument();
- auto documentKeyFromClient = tokenDataFromClient.documentKey.getDocument();
-
- // In order for the relaxed comparison to be applicable, the client token must have a single _id
- // field, and the resumed stream token must have additional fields beyond _id.
- if (!(documentKeyFromClient.computeSize() == 1 &&
- documentKeyFromResumedStream.computeSize() > 1)) {
- return defaultResumeStatus;
- }
-
- // If the resume token's documentKey only contains the _id field while the pipeline's
- // documentKey contains additional fields, we require only that the _ids match.
- return (!documentKeyFromClient["_id"].missing() &&
- ValueComparator::kInstance.evaluate(documentKeyFromResumedStream["_id"] ==
- documentKeyFromClient["_id"])
- ? ResumeStatus::kFoundToken
- : defaultResumeStatus);
}
DocumentSourceChangeStreamCheckResumability::DocumentSourceChangeStreamCheckResumability(
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 7456d4370e2..eaf44eb21aa 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -127,11 +127,9 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
std::vector<repl::OplogEntry>::const_iterator mockEntriesIt;
};
- MockMongoInterface(std::vector<FieldPath> fields,
- std::vector<repl::OplogEntry> transactionEntries = {},
+ MockMongoInterface(std::vector<repl::OplogEntry> transactionEntries = {},
std::vector<Document> documentsForLookup = {})
- : _fields(std::move(fields)),
- _transactionEntries(std::move(transactionEntries)),
+ : _transactionEntries(std::move(transactionEntries)),
_documentsForLookup{std::move(documentsForLookup)} {}
// For tests of transactions that involve multiple oplog entries.
@@ -173,14 +171,6 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
return (it != _documentsForLookup.end() ? *it : boost::optional<Document>{});
}
- // For "insert" tests.
- std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
- OperationContext*, const NamespaceString&, UUID) const final {
- return {_fields, false};
- }
-
- std::vector<FieldPath> _fields;
-
// Stores oplog entries associated with a commit operation, including the oplog entries that a
// real DocumentSourceChangeStream would not see, because they are marked with a "prepare" or
// "partialTxn" flag. When the DocumentSourceChangeStream sees the commit for the transaction,
@@ -212,7 +202,6 @@ public:
void checkTransformation(const OplogEntry& entry,
const boost::optional<Document> expectedDoc,
- std::vector<FieldPath> docKeyFields = {},
const BSONObj& spec = kDefaultSpec,
const boost::optional<Document> expectedInvalidate = {},
const std::vector<repl::OplogEntry> transactionEntries = {},
@@ -220,8 +209,8 @@ public:
vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.getEntry().toBSON(), spec);
auto lastStage = stages.back();
- getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
- docKeyFields, transactionEntries, std::move(documentsForLookup));
+ getExpCtx()->mongoProcessInterface =
+ std::make_unique<MockMongoInterface>(transactionEntries, std::move(documentsForLookup));
auto next = lastStage->getNext();
// Match stage should pass the doc down if expectedDoc is given.
@@ -248,8 +237,7 @@ public:
list<intrusive_ptr<DocumentSource>> result =
DSChangeStream::createFromBson(spec.firstElement(), getExpCtx());
vector<intrusive_ptr<DocumentSource>> stages(std::begin(result), std::end(result));
- getExpCtx()->mongoProcessInterface =
- std::make_unique<MockMongoInterface>(std::vector<FieldPath>{});
+ getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>();
// This match stage is a DocumentSourceChangeStreamOplogMatch, which we explicitly disallow
// from executing as a safety mechanism, since it needs to use the collection-default
@@ -708,12 +696,12 @@ TEST_F(ChangeStreamStageTest, ShowMigrationsFailsOnMongos) {
}
TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
- auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
- nss, // namespace
- BSON("_id" << 1 << "x" << 2), // o
- testUuid(), // uuid
- boost::none, // fromMigrate
- boost::none); // o2
+ auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ BSON("_id" << 1 << "x" << 2), // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ BSON("x" << 2 << "_id" << 1)); // o2
Document expectedInsert{
{DSChangeStream::kIdField,
@@ -724,7 +712,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
};
- checkTransformation(insert, expectedInsert, {{"x"}, {"_id"}});
+ checkTransformation(insert, expectedInsert);
bool fromMigrate = false; // also check actual "fromMigrate: false" not filtered
auto insert2 = makeOplogEntry(insert.getOpType(), // op type
insert.getNss(), // namespace
@@ -732,16 +720,16 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
insert.getUuid(), // uuid
fromMigrate, // fromMigrate
insert.getObject2()); // o2
- checkTransformation(insert2, expectedInsert, {{"x"}, {"_id"}});
+ checkTransformation(insert2, expectedInsert);
}
TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) {
- auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
- nss, // namespace
- BSON("x" << 2 << "_id" << 1), // o
- testUuid(), // uuid
- boost::none, // fromMigrate
- boost::none); // o2
+ auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ BSON("x" << 2 << "_id" << 1), // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ BSON("_id" << 1 << "x" << 2)); // o2
Document expectedInsert{
{DSChangeStream::kIdField,
@@ -752,7 +740,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
};
- checkTransformation(insert, expectedInsert, {{"_id"}, {"x"}});
+ checkTransformation(insert, expectedInsert);
}
TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) {
@@ -761,7 +749,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) {
BSON("_id" << 1 << "x" << 2), // o
testUuid(), // uuid
boost::none, // fromMigrate
- boost::none); // o2
+ BSON("_id" << 1)); // o2
Document expectedInsert{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
@@ -771,7 +759,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
- checkTransformation(insert, expectedInsert, {{"_id"}});
+ checkTransformation(insert, expectedInsert);
}
TEST_F(ChangeStreamStageTest, TransformInsertFromMigrate) {
@@ -788,12 +776,12 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrate) {
TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) {
bool fromMigrate = true;
- auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
- nss, // namespace
- BSON("x" << 2 << "_id" << 1), // o
- testUuid(), // uuid
- fromMigrate, // fromMigrate
- boost::none); // o2
+ auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ BSON("x" << 2 << "_id" << 1), // o
+ testUuid(), // uuid
+ fromMigrate, // fromMigrate
+ BSON("_id" << 1 << "x" << 2)); // o2
auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
Document expectedInsert{
@@ -805,7 +793,7 @@ TEST_F(ChangeStreamStageTest, TransformInsertFromMigrateShowMigrations) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
};
- checkTransformation(insert, expectedInsert, {{"_id"}, {"x"}}, spec);
+ checkTransformation(insert, expectedInsert, spec);
}
TEST_F(ChangeStreamStageTest, TransformUpdateFields) {
@@ -1082,7 +1070,7 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) {
o, // o
testUuid(), // uuid
fromMigrate, // fromMigrate
- boost::none); // o2
+ BSON("_id" << 1)); // o2
auto spec = fromjson("{$changeStream: {showMigrationEvents: true}}");
Document expectedDelete{
@@ -1093,7 +1081,7 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrateShowMigrations) {
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
- checkTransformation(deleteEntry, expectedDelete, {}, spec);
+ checkTransformation(deleteEntry, expectedDelete, spec);
}
TEST_F(ChangeStreamStageTest, TransformDrop) {
@@ -1113,7 +1101,7 @@ TEST_F(ChangeStreamStageTest, TransformDrop) {
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
- checkTransformation(dropColl, expectedDrop, {}, kDefaultSpec, expectedInvalidate);
+ checkTransformation(dropColl, expectedDrop, kDefaultSpec, expectedInvalidate);
}
TEST_F(ChangeStreamStageTest, TransformRename) {
@@ -1137,7 +1125,7 @@ TEST_F(ChangeStreamStageTest, TransformRename) {
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
- checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate);
+ checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate);
}
TEST_F(ChangeStreamStageTest, TransformInvalidateFromMigrate) {
@@ -1179,7 +1167,7 @@ TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
- checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate);
+ checkTransformation(rename, expectedRename, kDefaultSpec, expectedInvalidate);
}
TEST_F(ChangeStreamStageTest, MatchFiltersDropDatabaseCommand) {
@@ -1228,7 +1216,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardBegin) {
{DSChangeStream::kOperationTypeField, DSChangeStream::kReshardBeginOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
- checkTransformation(reshardingBegin, expectedReshardingBegin, {}, spec);
+ checkTransformation(reshardingBegin, expectedReshardingBegin, spec);
}
TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
@@ -1258,7 +1246,7 @@ TEST_F(ChangeStreamStageTest, TransformReshardDoneCatchUp) {
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
- checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, {}, spec);
+ checkTransformation(reshardDoneCatchUp, expectedReshardingDoneCatchUp, spec);
}
TEST_F(ChangeStreamStageTest, TransformEmptyApplyOps) {
@@ -1380,7 +1368,11 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
Document preparedApplyOps{
{"applyOps",
Value{std::vector<Document>{
- D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}},
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{D{{"_id", 123}}}},
+ {"o2", V{D{}}}},
}}},
{"prepare", true},
};
@@ -1434,7 +1426,7 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
{DSChangeStream::kDocumentKeyField, D{}},
};
- checkTransformation(oplogEntry, expectedResult, {}, kDefaultSpec, {}, {preparedTransaction});
+ checkTransformation(oplogEntry, expectedResult, kDefaultSpec, {}, {preparedTransaction});
}
TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
@@ -1450,11 +1442,13 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
D{{"op", "i"_sd},
{"ns", nss.ns()},
{"ui", testUuid()},
- {"o", V{Document{{"_id", 123}}}}},
+ {"o", V{Document{{"_id", 123}}}},
+ {"o2", V{Document{{"_id", 123}}}}},
D{{"op", "i"_sd},
{"ns", nss.ns()},
{"ui", testUuid()},
- {"o", V{Document{{"_id", 456}}}}},
+ {"o", V{Document{{"_id", 456}}}},
+ {"o2", V{Document{{"_id", 456}}}}},
}}},
{"partialTxn", true},
};
@@ -1473,7 +1467,11 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
Document applyOps2{
{"applyOps",
V{std::vector<Document>{
- D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}},
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{D{{"_id", 789}}}},
+ {"o2", V{D{{"_id", 789}}}}},
}}},
/* The absence of the "partialTxn" and "prepare" fields indicates that this command commits
the transaction. */
@@ -1497,7 +1495,6 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
// Populate the MockTransactionHistoryEditor in reverse chronological order.
getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
- std::vector<FieldPath>{},
std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1});
// We should get three documents from the change stream, based on the documents in the two
@@ -1515,7 +1512,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
ASSERT_DOCUMENT_EQ(resumeToken,
makeResumeToken(applyOpsOpTime2.getTimestamp(),
testUuid(),
- V{D{}},
+ V{D{{"_id", 123}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
0));
@@ -1532,7 +1529,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
ASSERT_DOCUMENT_EQ(resumeToken,
makeResumeToken(applyOpsOpTime2.getTimestamp(),
testUuid(),
- V{D{}},
+ V{D{{"_id", 456}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
1));
@@ -1549,7 +1546,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithMultipleOplogEntries) {
ASSERT_DOCUMENT_EQ(resumeToken,
makeResumeToken(applyOpsOpTime2.getTimestamp(),
testUuid(),
- V{D{}},
+ V{D{{"_id", 789}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
2));
}
@@ -1587,7 +1584,8 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
D{{"op", "i"_sd},
{"ns", nss.ns()},
{"ui", testUuid()},
- {"o", V{Document{{"_id", 123}}}}},
+ {"o", V{Document{{"_id", 123}}}},
+ {"o2", V{Document{{"_id", 123}}}}},
}}},
{"partialTxn", true},
};
@@ -1624,7 +1622,8 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
V{std::vector<Document>{D{{"op", "i"_sd},
{"ns", nss.ns()},
{"ui", testUuid()},
- {"o", V{Document{{"_id", 456}}}}}}}},
+ {"o", V{Document{{"_id", 456}}}},
+ {"o2", V{Document{{"_id", 456}}}}}}}},
{"partialTxn", true},
};
@@ -1663,8 +1662,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
// Populate the MockTransactionHistoryEditor in reverse chronological order.
getExpCtx()->mongoProcessInterface =
- std::make_unique<MockMongoInterface>(std::vector<FieldPath>{},
- std::vector<repl::OplogEntry>{transactionEntry5,
+ std::make_unique<MockMongoInterface>(std::vector<repl::OplogEntry>{transactionEntry5,
transactionEntry4,
transactionEntry3,
transactionEntry2,
@@ -1685,7 +1683,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
ASSERT_DOCUMENT_EQ(resumeToken,
makeResumeToken(applyOpsOpTime5.getTimestamp(),
testUuid(),
- V{D{}},
+ V{D{{"_id", 123}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
0));
@@ -1702,7 +1700,7 @@ TEST_F(ChangeStreamStageTest, TransactionWithEmptyOplogEntries) {
ASSERT_DOCUMENT_EQ(resumeToken,
makeResumeToken(applyOpsOpTime5.getTimestamp(),
testUuid(),
- V{D{}},
+ V{D{{"_id", 456}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
1));
}
@@ -1755,7 +1753,6 @@ TEST_F(ChangeStreamStageTest, TransactionWithOnlyEmptyOplogEntries) {
// Populate the MockTransactionHistoryEditor in reverse chronological order.
getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
- std::vector<FieldPath>{},
std::vector<repl::OplogEntry>{transactionEntry2, transactionEntry1});
// We should get three documents from the change stream, based on the documents in the two
@@ -1774,8 +1771,16 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
Document applyOps1{
{"applyOps",
V{std::vector<Document>{
- D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}},
- D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}},
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{D{{"_id", 123}}}},
+ {"o2", V{D{{"_id", 123}}}}},
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{D{{"_id", 456}}}},
+ {"o2", V{D{{"_id", 456}}}}},
}}},
{"partialTxn", true},
};
@@ -1794,7 +1799,11 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
Document applyOps2{
{"applyOps",
V{std::vector<Document>{
- D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 789}}}}},
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{D{{"_id", 789}}}},
+ {"o2", V{D{{"_id", 789}}}}},
}}},
{"prepare", true},
};
@@ -1839,7 +1848,6 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
// Populate the MockTransactionHistoryEditor in reverse chronological order.
getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
- std::vector<FieldPath>{},
std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1});
// We should get three documents from the change stream, based on the documents in the two
@@ -1858,7 +1866,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
resumeToken,
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
- V{D{}},
+ V{D{{"_id", 123}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
0));
@@ -1876,7 +1884,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
resumeToken,
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
- V{D{}},
+ V{D{{"_id", 456}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
1));
@@ -1894,7 +1902,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
resumeToken,
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
- V{D{}},
+ V{D{{"_id", 789}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
2));
@@ -1912,8 +1920,16 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
Document applyOps1{
{"applyOps",
V{std::vector<Document>{
- D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 123}}}}},
- D{{"op", "i"_sd}, {"ns", nss.ns()}, {"ui", testUuid()}, {"o", V{D{{"_id", 456}}}}},
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{D{{"_id", 123}}}},
+ {"o2", V{D{{"_id", 123}}}}},
+ D{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", V{D{{"_id", 456}}}},
+ {"o2", V{D{{"_id", 456}}}}},
}}},
{"partialTxn", true},
};
@@ -1975,7 +1991,6 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
// Populate the MockTransactionHistoryEditor in reverse chronological order.
getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>(
- std::vector<FieldPath>{},
std::vector<repl::OplogEntry>{commitEntry, transactionEntry2, transactionEntry1});
// We should get two documents from the change stream, based on the documents in the non-empty
@@ -1994,7 +2009,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
resumeToken,
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
- V{D{}},
+ V{D{{"_id", 123}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
0));
@@ -2012,7 +2027,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
resumeToken,
makeResumeToken(kDefaultOpTime.getTimestamp(), // Timestamp of the commitCommand.
testUuid(),
- V{D{}},
+ V{D{{"_id", 456}}},
ResumeTokenData::FromInvalidate::kNotFromInvalidate,
1));
@@ -2374,7 +2389,7 @@ TEST_F(ChangeStreamStageTest, CloseCursorEvenIfInvalidateEntriesGetFilteredOut)
ASSERT_THROWS(match->getNext(), ExceptionFor<ErrorCodes::ChangeStreamInvalidated>);
}
-TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeTokenWhenNoO2FieldInOplog) {
const Timestamp ts(3, 45);
const long long term = 4;
const auto opTime = repl::OpTime(ts, term);
@@ -2387,8 +2402,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
});
- BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
- auto resumeToken = makeResumeToken(ts, uuid, o2);
+ BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey);
BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
@@ -2410,17 +2425,55 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
// Although the chunk manager and sharding catalog are not aware of the shard key in this test,
// the expectation is for the $changeStream stage to infer the shard key from the resume token.
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
// Verify the same behavior with resuming using 'startAfter'.
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("startAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKeyCache) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ std::shared_ptr<Collection> collection =
+ std::make_shared<CollectionMock>(TenantNamespace(boost::none, nss));
+ CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
+ catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ });
+
+
+ BSONObj docKey = BSON("_id" << 1);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey);
+
+ BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
+ BSONObj o2 = BSON("_id" << 2 << "shardKey" << 3);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ o2, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
+ };
+ // When o2 is present in the oplog entry, we should use its value for the document key, even if
+ // the resume token doesn't contain shard key.
+ checkTransformation(
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+
+ // Verify the same behavior with resuming using 'startAfter'.
+ checkTransformation(
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken)));
}
TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
@@ -2435,8 +2488,8 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres
catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
});
- BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
- auto resumeToken = makeResumeToken(ts, uuid, o2);
+ BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey);
// Note that the 'o' field in the oplog entry does not contain the shard key field.
BSONObj insertDoc = BSON("_id" << 2);
@@ -2457,17 +2510,11 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPres
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
};
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
// Verify the same behavior with resuming using 'startAfter'.
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("startAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken)));
}
TEST_F(ChangeStreamStageTest, ResumeAfterFailsIfResumeTokenDoesNotContainUUID) {
@@ -2552,17 +2599,16 @@ TEST_F(ChangeStreamStageTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
}
TEST_F(ChangeStreamStageTest, UsesResumeTokenAsSortKeyIfNeedsMergeIsFalse) {
- auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
- nss, // namespace
- BSON("x" << 2 << "_id" << 1), // o
- testUuid(), // uuid
- boost::none, // fromMigrate
- boost::none); // o2
+ auto insert = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ BSON("x" << 2 << "_id" << 1), // o
+ testUuid(), // uuid
+ boost::none, // fromMigrate
+ BSON("x" << 2 << "_id" << 1)); // o2
auto stages = makeStages(insert.getEntry().toBSON(), kDefaultSpec);
- getExpCtx()->mongoProcessInterface =
- std::make_unique<MockMongoInterface>(std::vector<FieldPath>{{"x"}, {"_id"}});
+ getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>();
getExpCtx()->needsMerge = false;
@@ -2584,7 +2630,12 @@ public:
};
TEST_F(ChangeStreamStageDBTest, TransformInsert) {
- auto insert = makeOplogEntry(OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 2));
+ auto insert = makeOplogEntry(OpTypeEnum::kInsert,
+ nss,
+ BSON("_id" << 1 << "x" << 2),
+ testUuid(),
+ boost::none,
+ BSON("x" << 2 << "_id" << 1));
Document expectedInsert{
{DSChangeStream::kIdField,
@@ -2595,13 +2646,17 @@ TEST_F(ChangeStreamStageDBTest, TransformInsert) {
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
};
- checkTransformation(insert, expectedInsert, {{"x"}, {"_id"}});
+ checkTransformation(insert, expectedInsert);
}
TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) {
NamespaceString otherNss("unittests.other_collection.");
- auto insertOtherColl =
- makeOplogEntry(OpTypeEnum::kInsert, otherNss, BSON("_id" << 1 << "x" << 2));
+ auto insertOtherColl = makeOplogEntry(OpTypeEnum::kInsert,
+ otherNss,
+ BSON("_id" << 1 << "x" << 2),
+ testUuid(),
+ boost::none,
+ BSON("x" << 2 << "_id" << 1));
// Insert on another collection in the same database.
Document expectedInsert{
@@ -2613,7 +2668,7 @@ TEST_F(ChangeStreamStageDBTest, InsertOnOtherCollections) {
{DSChangeStream::kNamespaceField, D{{"db", otherNss.db()}, {"coll", otherNss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
};
- checkTransformation(insertOtherColl, expectedInsert, {{"x"}, {"_id"}});
+ checkTransformation(insertOtherColl, expectedInsert);
}
TEST_F(ChangeStreamStageDBTest, MatchFiltersChangesOnOtherDatabases) {
@@ -2661,7 +2716,8 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
};
for (auto& ns : allowedNamespaces) {
- auto insert = makeOplogEntry(OpTypeEnum::kInsert, ns, BSON("_id" << 1));
+ auto insert = makeOplogEntry(
+ OpTypeEnum::kInsert, ns, BSON("_id" << 1), testUuid(), boost::none, BSON("_id" << 1));
Document expectedInsert{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), BSON("_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
@@ -2670,7 +2726,7 @@ TEST_F(ChangeStreamStageDBTest, TransformsEntriesForLegalClientCollectionsWithSy
{DSChangeStream::kNamespaceField, D{{"db", ns.db()}, {"coll", ns.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
- checkTransformation(insert, expectedInsert, {{"_id"}});
+ checkTransformation(insert, expectedInsert);
}
}
@@ -2798,7 +2854,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrateShowMigrations) {
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}},
};
- checkTransformation(deleteEntry, expectedDelete, {}, spec);
+ checkTransformation(deleteEntry, expectedDelete, spec);
}
TEST_F(ChangeStreamStageDBTest, TransformDrop) {
@@ -2846,7 +2902,7 @@ TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
- checkTransformation(dropDB, expectedDropDatabase, {}, kDefaultSpec, expectedInvalidate);
+ checkTransformation(dropDB, expectedDropDatabase, kDefaultSpec, expectedInvalidate);
}
TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
@@ -2900,7 +2956,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
{DSChangeStream::kDocumentKeyField, documentKey},
};
checkTransformation(
- deleteEntry, expectedDeleteNoPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ deleteEntry, expectedDeleteNoPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
@@ -2914,13 +2970,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
{DSChangeStream::kFullDocumentBeforeChangeField, preImageObj},
};
checkTransformation(
- deleteEntry, expectedDeleteWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ deleteEntry, expectedDeleteWithPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "required"}, we see the pre-image.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
checkTransformation(
- deleteEntry, expectedDeleteWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ deleteEntry, expectedDeleteWithPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image is available, the
// output 'fullDocumentBeforeChange' field is explicitly set to 'null'.
@@ -2929,13 +2985,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
MutableDocument expectedDeleteWithNullPreImage(expectedDeleteNoPreImage);
expectedDeleteWithNullPreImage.addField(DSChangeStream::kFullDocumentBeforeChangeField,
Value(BSONNULL));
- checkTransformation(deleteEntry, expectedDeleteWithNullPreImage.freeze(), {}, spec);
+ checkTransformation(deleteEntry, expectedDeleteWithNullPreImage.freeze(), spec);
// When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we
// throw NoMatchingDocument.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
- ASSERT_THROWS_CODE(checkTransformation(deleteEntry, boost::none, {}, spec),
+ ASSERT_THROWS_CODE(checkTransformation(deleteEntry, boost::none, spec),
AssertionException,
ErrorCodes::NoMatchingDocument);
}
@@ -2996,7 +3052,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
},
};
checkTransformation(
- updateEntry, expectedUpdateNoPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ updateEntry, expectedUpdateNoPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
@@ -3014,13 +3070,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
{DSChangeStream::kFullDocumentBeforeChangeField, preImageObj},
};
checkTransformation(
- updateEntry, expectedUpdateWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ updateEntry, expectedUpdateWithPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "required"}, we see the pre-image.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
checkTransformation(
- updateEntry, expectedUpdateWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ updateEntry, expectedUpdateWithPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image is available, the
// output 'fullDocumentBeforeChange' field is explicitly set to 'null'.
@@ -3029,13 +3085,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
MutableDocument expectedUpdateWithNullPreImage(expectedUpdateNoPreImage);
expectedUpdateWithNullPreImage.addField(DSChangeStream::kFullDocumentBeforeChangeField,
Value(BSONNULL));
- checkTransformation(updateEntry, expectedUpdateWithNullPreImage.freeze(), {}, spec);
+ checkTransformation(updateEntry, expectedUpdateWithNullPreImage.freeze(), spec);
// When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we
// throw NoMatchingDocument.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
- ASSERT_THROWS_CODE(checkTransformation(updateEntry, boost::none, {}, spec),
+ ASSERT_THROWS_CODE(checkTransformation(updateEntry, boost::none, spec),
AssertionException,
ErrorCodes::NoMatchingDocument);
}
@@ -3093,7 +3149,7 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
{DSChangeStream::kDocumentKeyField, documentKey},
};
checkTransformation(
- replaceEntry, expectedReplaceNoPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ replaceEntry, expectedReplaceNoPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "whenAvailable"}, we see the pre-image.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
@@ -3108,13 +3164,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
{DSChangeStream::kFullDocumentBeforeChangeField, preImageObj},
};
checkTransformation(
- replaceEntry, expectedReplaceWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ replaceEntry, expectedReplaceWithPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "required"}, we see the pre-image.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
checkTransformation(
- replaceEntry, expectedReplaceWithPreImage, {}, spec, boost::none, {}, documentsForLookup);
+ replaceEntry, expectedReplaceWithPreImage, spec, boost::none, {}, documentsForLookup);
// When run with {fullDocumentBeforeChange: "whenAvailable"} but no pre-image is available, the
// output 'fullDocumentBeforeChange' field is explicitly set to 'null'.
@@ -3123,13 +3179,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
MutableDocument expectedReplaceWithNullPreImage(expectedReplaceNoPreImage);
expectedReplaceWithNullPreImage.addField(DSChangeStream::kFullDocumentBeforeChangeField,
Value(BSONNULL));
- checkTransformation(replaceEntry, expectedReplaceWithNullPreImage.freeze(), {}, spec);
+ checkTransformation(replaceEntry, expectedReplaceWithNullPreImage.freeze(), spec);
// When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we
// throw NoMatchingDocument.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
- ASSERT_THROWS_CODE(checkTransformation(replaceEntry, boost::none, {}, spec),
+ ASSERT_THROWS_CODE(checkTransformation(replaceEntry, boost::none, spec),
AssertionException,
ErrorCodes::NoMatchingDocument);
}
@@ -3195,7 +3251,8 @@ TEST_F(ChangeStreamStageDBTest, MatchFiltersNoOp) {
checkTransformation(noOp, boost::none);
}
-TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken) {
+TEST_F(ChangeStreamStageDBTest,
+ DocumentKeyShouldIncludeShardKeyFromResumeTokenWhenNoO2FieldInOplog) {
const Timestamp ts(3, 45);
const long long term = 4;
const auto opTime = repl::OpTime(ts, term);
@@ -3207,8 +3264,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken)
catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
});
- BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
- auto resumeToken = makeResumeToken(ts, uuid, o2);
+ BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey);
BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
@@ -3227,11 +3284,49 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldIncludeShardKeyFromResumeToken)
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
};
+ // Although the chunk manager and sharding catalog are not aware of the shard key in this test,
+ // the expectation is for the $changeStream stage to infer the shard key from the resume token.
+ checkTransformation(
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+}
+
+TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldPrioritizeO2FieldOverDocumentKeyCache) {
+ const Timestamp ts(3, 45);
+ const long long term = 4;
+ const auto opTime = repl::OpTime(ts, term);
+ const auto uuid = testUuid();
+
+ std::shared_ptr<Collection> collection =
+ std::make_shared<CollectionMock>(TenantNamespace(boost::none, nss));
+ CollectionCatalog::write(getExpCtx()->opCtx, [&](CollectionCatalog& catalog) {
+ catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
+ });
+
+ BSONObj docKey = BSON("_id" << 1);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey);
+
+ BSONObj insertDoc = BSON("_id" << 2 << "shardKey" << 3);
+ BSONObj o2 = BSON("_id" << 2 << "shardKey" << 3);
+ auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, // op type
+ nss, // namespace
+ insertDoc, // o
+ uuid, // uuid
+ boost::none, // fromMigrate
+ o2, // o2
+ opTime); // opTime
+
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, uuid, insertDoc)},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 2}, {"shardKey", 3}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 2}, {"shardKey", 3}}},
+ };
+ // When o2 is present in the oplog entry, we should use its value for the document key, even if
+ // the resume token doesn't contain shard key.
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
}
TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPresentInOplogEntry) {
@@ -3246,8 +3341,8 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPr
catalog.registerCollection(getExpCtx()->opCtx, uuid, std::move(collection));
});
- BSONObj o2 = BSON("_id" << 1 << "shardKey" << 2);
- auto resumeToken = makeResumeToken(ts, uuid, o2);
+ BSONObj docKey = BSON("_id" << 1 << "shardKey" << 2);
+ auto resumeToken = makeResumeToken(ts, uuid, docKey);
// Note that the 'o' field in the oplog entry does not contain the shard key field.
BSONObj insertDoc = BSON("_id" << 2);
@@ -3268,10 +3363,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyFieldsIfNotPr
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
};
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
}
TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDoesntContainUUID) {
@@ -3309,10 +3401,7 @@ TEST_F(ChangeStreamStageDBTest, DocumentKeyShouldNotIncludeShardKeyIfResumeToken
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
};
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
}
TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromInvalidateShouldFail) {
@@ -3354,7 +3443,8 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) {
kDefaultTs, Value(), Value(), ResumeTokenData::FromInvalidate::kNotFromInvalidate);
BSONObj insertDoc = BSON("_id" << 2);
- auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc);
+ auto insertEntry =
+ makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc, testUuid(), boost::none, insertDoc);
Document expectedInsert{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid(), insertDoc)},
@@ -3365,10 +3455,7 @@ TEST_F(ChangeStreamStageDBTest, ResumeAfterWithTokenFromDropDatabase) {
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
};
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("resumeAfter" << resumeToken)));
}
@@ -3385,7 +3472,8 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
auto resumeToken = makeResumeToken(kDefaultTs);
BSONObj insertDoc = BSON("_id" << 2);
- auto insertEntry = makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc);
+ auto insertEntry =
+ makeOplogEntry(OpTypeEnum::kInsert, nss, insertDoc, uuid, boost::none, insertDoc);
Document expectedInsert{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, uuid, insertDoc)},
@@ -3396,10 +3484,7 @@ TEST_F(ChangeStreamStageDBTest, StartAfterSucceedsEvenIfResumeTokenDoesNotContai
{DSChangeStream::kDocumentKeyField, D{{"_id", 2}}},
};
checkTransformation(
- insertEntry,
- expectedInsert,
- {{"_id"}}, // Mock the 'collectDocumentKeyFieldsForHostedCollection' response.
- BSON("$changeStream" << BSON("startAfter" << resumeToken)));
+ insertEntry, expectedInsert, BSON("$changeStream" << BSON("startAfter" << resumeToken)));
}
TEST_F(ChangeStreamStageTest, ChangeStreamWithSingleMatch) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 0e43a47ef0d..148e0c9a385 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/pipeline/change_stream_document_diff_parser.h"
+#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
@@ -112,22 +113,7 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
// If the change stream spec includes a resumeToken with a shard key, populate the document key
// cache with the field paths.
- if (!tokenData.documentKey.missing() && tokenData.uuid) {
- std::vector<FieldPath> docKeyFields;
- auto docKey = tokenData.documentKey.getDocument();
-
- auto iter = docKey.fieldIterator();
- while (iter.more()) {
- auto fieldPair = iter.next();
- docKeyFields.push_back(fieldPair.first);
- }
-
- // If the document key from the resume token has more than one field, that means it
- // includes the shard key and thus should never change.
- const bool isFinal = docKeyFields.size() > 1;
-
- _documentKeyCache[tokenData.uuid.get()] = DocumentKeyCacheEntry({docKeyFields, isFinal});
- }
+ _documentKeyCache = change_stream_legacy::buildDocumentKeyCache(tokenData);
}
StageConstraints DocumentSourceChangeStreamTransform::constraints(
@@ -196,31 +182,11 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
Value ns = input[repl::OplogEntry::kNssFieldName];
checkValueType(ns, repl::OplogEntry::kNssFieldName, BSONType::String);
Value uuid = input[repl::OplogEntry::kUuidFieldName];
- std::vector<FieldPath> documentKeyFields;
// Deal with CRUD operations and commands.
auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op);
NamespaceString nss(ns.getString());
- // Ignore commands in the oplog when looking up the document key fields since a command implies
- // that the change stream is about to be invalidated (e.g. collection drop).
- if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) {
- checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData);
- // We need to retrieve the document key fields if our cache does not have an entry for this
- // UUID or if the cache entry is not definitively final, indicating that the collection was
- // unsharded when the entry was last populated.
- auto it = _documentKeyCache.find(uuid.getUuid());
- if (it == _documentKeyCache.end() || !it->second.isFinal) {
- auto docKeyFields =
- pExpCtx->mongoProcessInterface->collectDocumentKeyFieldsForHostedCollection(
- pExpCtx->opCtx, nss, uuid.getUuid());
- if (it == _documentKeyCache.end() || docKeyFields.second) {
- _documentKeyCache[uuid.getUuid()] = DocumentKeyCacheEntry(docKeyFields);
- }
- }
-
- documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields;
- }
Value id = input.getNestedField("o._id");
// Non-replace updates have the _id in field "o2".
StringData operationType;
@@ -232,8 +198,23 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
case repl::OpTypeEnum::kInsert: {
operationType = DocumentSourceChangeStream::kInsertOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
- documentKey = Value(document_path_support::extractPathsFromDoc(
- fullDocument.getDocument(), documentKeyFields));
+ documentKey = input[repl::OplogEntry::kObject2FieldName];
+ // For oplog entries written on an older version of the server, the documentKey may be
+ // missing.
+ if (documentKey.missing()) {
+ // If we are resuming from an 'insert' oplog entry that does not have a documentKey,
+ // it may have been read on an older version of the server that populated the
+ // documentKey fields from the sharding catalog. We populate the fields we observed
+ // in the resume token in order to retain consistent event ordering around the
+ // resume point during upgrade. Otherwise, we default to _id as the only document
+ // key field.
+ if (_documentKeyCache && _documentKeyCache->first == uuid.getUuid()) {
+ documentKey = Value(document_path_support::extractPathsFromDoc(
+ fullDocument.getDocument(), _documentKeyCache->second));
+ } else {
+ documentKey = Value(Document{{"_id", id}});
+ }
+ }
break;
}
case repl::OpTypeEnum::kDelete: {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 51d3fdc793c..d9dcede8598 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -73,22 +73,6 @@ private:
DocumentSourceChangeStreamTransform(const boost::intrusive_ptr<ExpressionContext>& expCtx,
DocumentSourceChangeStreamSpec spec);
- struct DocumentKeyCacheEntry {
- DocumentKeyCacheEntry() = default;
-
- DocumentKeyCacheEntry(std::pair<std::vector<FieldPath>, bool> documentKeyFieldsIn)
- : documentKeyFields(documentKeyFieldsIn.first), isFinal(documentKeyFieldsIn.second){};
- // Fields of the document key, in order, including "_id" and the shard key if the
- // collection is sharded. Empty until the first oplog entry with a uuid is encountered.
- // Needed for transforming 'insert' oplog entries.
- std::vector<FieldPath> documentKeyFields;
-
- // Set to true if the document key fields for this entry are definitively known and will
- // not change. This implies that either the collection has become sharded or has been
- // dropped.
- bool isFinal;
- };
-
/**
* Helper used for determining what resume token to return.
*/
@@ -96,8 +80,8 @@ private:
DocumentSourceChangeStreamSpec _changeStreamSpec;
- // Map of collection UUID to document key fields.
- std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache;
+ // Records the documentKey fields from the client's resume token, if present.
+ boost::optional<std::pair<UUID, std::vector<FieldPath>>> _documentKeyCache;
// Set to true if this transformation stage can be run on the collectionless namespace.
bool _isIndependentOfAnyCollection;
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
index 66b0b6c62c1..67f8007fe2d 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -501,32 +501,6 @@ TEST_F(CheckResumeTokenTest, ShouldSucceedWithBinaryCollation) {
ASSERT_TRUE(checkResumeToken->getNext().isEOF());
}
-TEST_F(CheckResumeTokenTest, UnshardedTokenSucceedsForShardedResumeOnMongosIfIdMatchesFirstDoc) {
- // Verify that a resume token whose documentKey only contains _id can be used to resume a stream
- // on a sharded collection as long as its _id matches the first document. We set 'inMongos'
- // since this behaviour is only applicable when DSCSEnsureResumeTokenPresent is running on
- // mongoS.
- Timestamp resumeTimestamp(100, 1);
- getExpCtx()->inMongos = true;
-
- auto checkResumeToken =
- createDSEnsureResumeTokenPresent(resumeTimestamp, Document{{"_id"_sd, 1}});
-
- Timestamp doc1Timestamp(100, 1);
- addOplogEntryOnTestNS(doc1Timestamp, {{"x"_sd, 0}, {"_id"_sd, 1}});
- Timestamp doc2Timestamp(100, 2);
- Document doc2DocKey{{"x"_sd, 0}, {"_id"_sd, 2}};
- addOplogEntryOnTestNS(doc2Timestamp, doc2DocKey);
-
- // We should skip doc1 since it satisfies the resume token, and retrieve doc2.
- const auto firstDocAfterResume = checkResumeToken->getNext();
- const auto tokenFromFirstDocAfterResume =
- ResumeToken::parse(firstDocAfterResume.getDocument()["_id"].getDocument()).getData();
-
- ASSERT_EQ(tokenFromFirstDocAfterResume.clusterTime, doc2Timestamp);
- ASSERT_DOCUMENT_EQ(tokenFromFirstDocAfterResume.documentKey.getDocument(), doc2DocKey);
-}
-
TEST_F(CheckResumeTokenTest, UnshardedTokenFailsForShardedResumeOnMongosIfIdDoesNotMatchFirstDoc) {
Timestamp resumeTimestamp(100, 1);
getExpCtx()->inMongos = true;
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 937e3c01df7..fba8fcfea3d 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -330,16 +330,6 @@ public:
virtual std::string getHostAndPort(OperationContext* opCtx) const = 0;
/**
- * Returns the fields of the document key (in order) for the collection corresponding to 'uuid',
- * including the shard key and _id. If _id is not in the shard key, it is added last. If the
- * collection is not sharded or no longer exists, returns only _id. Also returns a boolean that
- * indicates whether the returned fields of the document key are final and will never change for
- * the given collection, either because the collection was dropped or has become sharded.
- */
- virtual std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
- OperationContext* opCtx, const NamespaceString&, UUID) const = 0;
-
- /**
* Returns the fields of the document key (in order) for the collection 'nss', according to the
* CatalogCache. The document key fields are the shard key (if sharded) and the _id (if not
* already in the shard key). If _id is not in the shard key, it is added last. If the
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index c10c9ceef80..e08a8169ed7 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -165,11 +165,6 @@ public:
MONGO_UNREACHABLE;
}
- std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
- OperationContext* opCtx, const NamespaceString&, UUID) const final {
- MONGO_UNREACHABLE;
- }
-
/**
* The following methods only make sense for data-bearing nodes and should never be called on
* a mongos.
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
index b54f190ae0b..682c0075340 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
@@ -59,12 +59,6 @@ std::list<BSONObj> NonShardServerProcessInterface::getIndexSpecs(OperationContex
opCtx, ns, includeBuildUUIDs ? ListIndexesInclude::BuildUUID : ListIndexesInclude::Nothing);
}
-std::pair<std::vector<FieldPath>, bool>
-NonShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(
- OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const {
- return {{"_id"}, false}; // Nothing is sharded.
-}
-
std::vector<FieldPath> NonShardServerProcessInterface::collectDocumentKeyFieldsActingAsRouter(
OperationContext* opCtx, const NamespaceString& nss) const {
return {"_id"}; // Nothing is sharded.
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index 663fa4531bf..b2b4f731f4e 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -75,8 +75,6 @@ public:
ChunkVersion targetCollectionVersion) const override {
uasserted(51020, "unexpected request to consult sharding catalog on non-shardsvr");
}
- std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
- OperationContext* opCtx, const NamespaceString&, UUID) const final;
std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const final;
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index 6597e7791af..00d0ae8c2de 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -89,33 +89,6 @@ void ShardServerProcessInterface::checkRoutingInfoEpochOrThrow(
foundVersion.isSameCollection(targetCollectionVersion));
}
-std::pair<std::vector<FieldPath>, bool>
-ShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(OperationContext* opCtx,
- const NamespaceString& nss,
- UUID uuid) const {
- invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
-
- auto* const catalogCache = Grid::get(opCtx)->catalogCache();
- auto swCM = catalogCache->getCollectionRoutingInfo(opCtx, nss);
- if (swCM.isOK()) {
- const auto& cm = swCM.getValue();
- if (cm.isSharded() && cm.uuidMatches(uuid)) {
- // Unpack the shard key. Collection is now sharded so the document key fields will never
- // change, mark as final.
- return {_shardKeyToDocumentKeyFields(cm.getShardKeyPattern().getKeyPatternFields()),
- true};
- }
- } else if (swCM != ErrorCodes::NamespaceNotFound) {
- uassertStatusOK(std::move(swCM));
- }
-
- // An unsharded collection can still become sharded so is not final. If the uuid doesn't match
- // the one stored in the ScopedCollectionDescription, this implies that the collection has been
- // dropped and recreated as sharded. We don't know what the old document key fields might have
- // been in this case so we return just _id.
- return {{"_id"}, false};
-}
-
boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index dc4b26a971a..d16ae72c444 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -53,9 +53,6 @@ public:
const NamespaceString& nss,
ChunkVersion targetCollectionVersion) const final;
- std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
- OperationContext* opCtx, const NamespaceString&, UUID) const final;
-
std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const final {
// We don't expect anyone to use this method on the shard itself (yet). This is currently
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index 68e85245cd9..5fe7d78a26b 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -184,11 +184,6 @@ public:
MONGO_UNREACHABLE;
}
- std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFieldsForHostedCollection(
- OperationContext*, const NamespaceString&, UUID) const override {
- MONGO_UNREACHABLE;
- }
-
std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const override {
MONGO_UNREACHABLE;