summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosef Ahmad <josef.ahmad@mongodb.com>2022-04-22 15:52:12 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-22 16:35:04 +0000
commit9e7b19e9047e003853bc3d0822962d6a2829773b (patch)
tree90983df676c5cd01cb3f4374829bf404d38adc3b
parent7c8c27650698e807ca8c885fb2a0e0d2996ecd19 (diff)
downloadmongo-9e7b19e9047e003853bc3d0822962d6a2829773b.tar.gz
SERVER-64972 Generate change stream events for batched deletes
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml2
-rw-r--r--jstests/change_streams/apply_ops.js27
-rw-r--r--jstests/noPassthrough/change_stream_unwind_batched_writes.js118
-rw-r--r--jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js23
-rw-r--r--src/mongo/db/op_observer_impl.cpp4
-rw-r--r--src/mongo/db/pipeline/change_stream_event_transform.cpp8
-rw-r--r--src/mongo/db/pipeline/change_stream_filter_helpers.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h5
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp78
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h11
13 files changed, 252 insertions, 59 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml
index 61a06d66c39..bc7ae9883c0 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml
@@ -5,10 +5,8 @@ selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
- # TODO: (SERVER-64972): add change stream support for batched deletes.
- - jstests/change_streams/change_stream.js
+ # TODO: (SERVER-64506): add PIT pre/post-image support for batched deletes.
- jstests/change_streams/lookup_pit_pre_and_post_image.js
- - jstests/change_streams/show_raw_update_description.js
exclude_with_any_tags:
- assumes_standalone_mongod
diff --git a/buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml b/buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml
index 0d7391ad637..ed05731bd2f 100644
--- a/buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml
@@ -11,7 +11,7 @@ selector:
- jstests/sharding/api_params_nontransaction_unsharded.js
# Expects DELETE stage
- jstests/sharding/query/explain_cmd.js
- # TODO: (SERVER-64972): add change stream support for batched deletes.
+ # TODO: (SERVER-64107): batched deleter should avoid generating events for orphaned documents.
- jstests/sharding/change_stream_no_orphans.js
exclude_with_any_tags:
diff --git a/jstests/change_streams/apply_ops.js b/jstests/change_streams/apply_ops.js
index c83dc7f7698..f123902b65a 100644
--- a/jstests/change_streams/apply_ops.js
+++ b/jstests/change_streams/apply_ops.js
@@ -72,13 +72,13 @@ withTxnAndAutoRetryOnMongos(session, () => {
}, txnOptions);
// Do applyOps on the collection that we care about. This is an "external" applyOps, though (not run
-// as part of a transaction) so its entries should be skipped in the change stream. This checks that
-// applyOps that don't have an 'lsid' and 'txnNumber' field do not get unwound. Skip if running in a
+// as part of a transaction). This checks that although this applyOps doesn't have an 'lsid' and
+// 'txnNumber', the field gets unwound and a change stream event is emitted. Skip if running in a
// sharded passthrough, since the applyOps command does not exist on mongoS.
if (!FixtureHelpers.isMongos(db)) {
assert.commandWorked(db.runCommand({
applyOps: [
- {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}},
+ {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "insert from atomic applyOps"}},
]
}));
}
@@ -87,7 +87,7 @@ if (!FixtureHelpers.isMongos(db)) {
assert.commandWorked(db.runCommand({drop: coll.getName()}));
// Define the set of changes expected for the single-collection case per the operations above.
-const expectedChanges = [
+let expectedChanges = [
{
documentKey: {_id: 1},
fullDocument: {_id: 1, a: 0},
@@ -118,13 +118,22 @@ const expectedChanges = [
operationType: "delete",
lsid: session.getSessionId(),
txnNumber: session.getTxnNumber_forTesting(),
- },
- {
- operationType: "drop",
- ns: {db: db.getName(), coll: coll.getName()},
- },
+ }
];
+if (!FixtureHelpers.isMongos(db)) {
+ expectedChanges.push({
+ documentKey: {_id: 3},
+ fullDocument: {_id: 3, a: "insert from atomic applyOps"},
+ ns: {db: db.getName(), coll: coll.getName()},
+ operationType: "insert",
+ });
+}
+expectedChanges.push({
+ operationType: "drop",
+ ns: {db: db.getName(), coll: coll.getName()},
+});
+
// If we are running in a sharded passthrough, then this may have been a multi-shard transaction.
// Change streams will interleave the txn events from across the shards in (clusterTime, txnOpIndex)
// order, and so may not reflect the ordering of writes in the test. We thus verify that exactly the
diff --git a/jstests/noPassthrough/change_stream_unwind_batched_writes.js b/jstests/noPassthrough/change_stream_unwind_batched_writes.js
new file mode 100644
index 00000000000..f84406c4a78
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_unwind_batched_writes.js
@@ -0,0 +1,118 @@
+/**
+ * Verifies change streams operation for batched writes.
+ *
+ * @tags: [
+ * # Running as a replica set requires journaling.
+ * requires_journaling,
+ * requires_majority_read_concern,
+ * uses_change_streams,
+ * ]
+ */
+(function() {
+"use strict";
+
+// '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches
+// multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set.
+// TODO (SERVER-63044): remove this special handling.
+const dbName = "__internalBatchedDeletesTesting";
+const collName = "Collection0";
+
+/**
+ * Asserts that the expected operation type and documentKey are found on the change stream
+ * cursor. Returns the change stream document.
+ */
+function assertWriteVisible(cursor, operationType, documentKey) {
+ assert.soon(() => cursor.hasNext());
+ const changeDoc = cursor.next();
+ assert.eq(operationType, changeDoc.operationType, changeDoc);
+ assert.eq(documentKey, changeDoc.documentKey, changeDoc);
+ // Change stream events for batched writes do not include lsid and txnNumber.
+ assert(!changeDoc.hasOwnProperty('lsid'));
+ assert(!changeDoc.hasOwnProperty('txnNumber'));
+ return changeDoc;
+}
+
+/**
+ * Asserts that the expected operation type and documentKey are found on the change stream
+ * cursor. Pushes the corresponding resume token and change stream document to an array.
+ */
+function assertWriteVisibleWithCapture(cursor, operationType, documentKey, changeList) {
+ const changeDoc = assertWriteVisible(cursor, operationType, documentKey);
+ changeList.push(changeDoc);
+}
+
+/**
+ * Asserts that there are no changes waiting on the change stream cursor.
+ */
+function assertNoChanges(cursor) {
+ assert(!cursor.hasNext(), () => {
+ return "Unexpected change set: " + tojson(cursor.toArray());
+ });
+}
+
+function runTest(conn) {
+ const db = conn.getDB(dbName);
+ const coll = db.getCollection(collName);
+
+ const docsPerBatch = 3;
+ const totalNumDocs = 8;
+ let changeList = [];
+
+ // Enable batched deletes. For consistent results, disable any batch targeting except for
+ // 'batchedDeletesTargetBatchDocs'.
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1}));
+ assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0}));
+ assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetStagedDocBytes: 0}));
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch}));
+
+ // Populate the collection, then open a change stream, then mass-delete the collection.
+ assert.commandWorked(
+ coll.insertMany([...Array(totalNumDocs).keys()].map(x => ({_id: x, txt: "a" + x}))));
+ const changeStreamCursor = coll.watch();
+ const serverStatusBatchesBefore = db.serverStatus()['batchedDeletes']['batches'];
+ const serverStatusDocsBefore = db.serverStatus()['batchedDeletes']['docs'];
+ assert.commandWorked(coll.deleteMany({_id: {$gte: 0}}));
+ assert.eq(0, coll.find().itcount());
+ const serverStatusBatchesAfter = db.serverStatus()['batchedDeletes']['batches'];
+ const serverStatusDocsAfter = db.serverStatus()['batchedDeletes']['docs'];
+ assert.eq(serverStatusBatchesAfter,
+ serverStatusBatchesBefore + Math.ceil(totalNumDocs / docsPerBatch));
+ assert.eq(serverStatusDocsAfter, serverStatusDocsBefore + totalNumDocs);
+
+ // Verify the change stream emits events for the batched deletion, and capture the events so we
+ // can test resumability later.
+ for (let docKey = 0; docKey < totalNumDocs; docKey++) {
+ assertWriteVisibleWithCapture(changeStreamCursor, "delete", {_id: docKey}, changeList);
+ }
+
+ assertNoChanges(changeStreamCursor);
+ changeStreamCursor.close();
+
+ // Test that change stream resume returns the expected set of documents at each point
+ // captured by this test.
+ for (let i = 0; i < changeList.length; ++i) {
+ const resumeCursor = coll.watch([], {startAfter: changeList[i]._id});
+
+ for (let x = (i + 1); x < changeList.length; ++x) {
+ const expectedChangeDoc = changeList[x];
+ assertWriteVisible(
+ resumeCursor, expectedChangeDoc.operationType, expectedChangeDoc.documentKey);
+ }
+
+ assertNoChanges(resumeCursor);
+ resumeCursor.close();
+ }
+
+ assert.commandWorked(db.dropDatabase());
+}
+
+const rst = new ReplSetTest({nodes: 1});
+rst.startSet();
+rst.initiate();
+
+runTest(rst.getPrimary());
+
+rst.stopSet();
+})();
diff --git a/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js b/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js
index ee1a4211cce..60ea0fbaeb0 100644
--- a/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js
+++ b/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js
@@ -1,8 +1,13 @@
/**
* Tests that insert oplog entries created by applyOps commands do not contain the 'fromMigrate'
- * field. Additionally tests that non-atomic applyOps inserts should be returned by changeStreams.
+ * field. Additionally tests inserts originating from applyOps commands are returned by
+ * changeStreams.
*
- * @tags: [uses_change_streams]
+ * @tags: [
+ * uses_change_streams,
+ * # Change streams emit events for applyOps without lsid and txnNumber as of SERVER-64972.
+ * multiversion_incompatible,
+ * ]
*/
(function() {
'use strict';
@@ -53,6 +58,7 @@ assert.commandWorked(primaryDB.runCommand({
}));
// Test atomic applyOps inserts.
+// TODO (SERVER-33182): Remove the atomic applyOps testing once atomic applyOps are removed.
assert.commandWorked(
primaryDB.runCommand({applyOps: [{op: "i", ns: nss(dbName, collName), o: {_id: 4}}]}));
assert.commandWorked(primaryDB.runCommand({
@@ -80,20 +86,25 @@ nonAtomicResults.forEach(function(op) {
assert(!op.hasOwnProperty("fromMigrate"), nonAtomicResults);
});
-// TODO (SERVER-33182): Remove the atomic applyOps testing once atomic applyOps are removed.
-// Atomic applyOps inserts are not expected to be picked up by changeStreams.
-primaryCST.assertNoChange(primaryChangeStream);
-secondaryCST.assertNoChange(secondaryChangeStream);
+// Atomic applyOps inserts are expected to be picked up by changeStreams.
// We expect the operations from an atomic applyOps command to be nested in an applyOps oplog entry.
const atomicResults = oplog.find({"o.applyOps": {$exists: true}}).toArray();
assert.eq(atomicResults.length, 2, atomicResults);
for (let i = 0; i < atomicResults.length; i++) {
let ops = atomicResults[i].o.applyOps;
ops.forEach(function(op) {
+ const primaryChange = primaryCST.getOneChange(primaryChangeStream);
+ assert.eq(primaryChange.documentKey._id, expectedCount, primaryChange);
+ const secondaryChange = secondaryCST.getOneChange(secondaryChangeStream);
+ assert.eq(secondaryChange.documentKey._id, expectedCount, secondaryChange);
assert.eq(op.o._id, expectedCount++, atomicResults);
assert(!op.hasOwnProperty("fromMigrate"), atomicResults);
});
}
+
+primaryCST.assertNoChange(primaryChangeStream);
+secondaryCST.assertNoChange(secondaryChangeStream);
+
assert.eq(7, expectedCount);
rst.stopSet();
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 14139e30d9d..51215543df6 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -1666,7 +1666,9 @@ OpTimeBundle logApplyOps(OperationContext* opCtx,
oplogEntry->setOpType(repl::OpTypeEnum::kCommand);
oplogEntry->setNss({"admin", "$cmd"});
- oplogEntry->setSessionId(opCtx->getLogicalSessionId());
+ // Batched writes (that is, WUOWs with 'groupOplogEntries') are not associated with a txnNumber,
+ // so do not emit an lsid either.
+ oplogEntry->setSessionId(opCtx->getTxnNumber() ? opCtx->getLogicalSessionId() : boost::none);
oplogEntry->setTxnNumber(opCtx->getTxnNumber());
if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) {
oplogEntry->getOperationSessionInfo().setTxnRetryCounter(*txnRetryCounter);
diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp
index 644ced702c7..7737821eb41 100644
--- a/src/mongo/db/pipeline/change_stream_event_transform.cpp
+++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp
@@ -43,6 +43,7 @@
namespace mongo {
namespace {
constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType;
+constexpr auto checkValueTypeOrMissing = &DocumentSourceChangeStream::checkValueTypeOrMissing;
Document copyDocExceptFields(const Document& source, const std::set<StringData>& fieldNames) {
MutableDocument doc(source);
@@ -385,13 +386,12 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum
// Add some additional fields only relevant to transactions.
if (!txnOpIndex.missing()) {
+ // The lsid and txnNumber may be missing if this is a batched write.
auto lsid = input[DocumentSourceChangeStream::kLsidField];
- checkValueType(lsid, DocumentSourceChangeStream::kLsidField, BSONType::Object);
-
+ checkValueTypeOrMissing(lsid, DocumentSourceChangeStream::kLsidField, BSONType::Object);
auto txnNumber = input[DocumentSourceChangeStream::kTxnNumberField];
- checkValueType(
+ checkValueTypeOrMissing(
txnNumber, DocumentSourceChangeStream::kTxnNumberField, BSONType::NumberLong);
-
doc.addField(DocumentSourceChangeStream::kTxnNumberField, txnNumber);
doc.addField(DocumentSourceChangeStream::kLsidField, lsid);
}
diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
index 106c9ea9eb6..f7d744d5c9d 100644
--- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp
@@ -236,8 +236,6 @@ std::unique_ptr<MatchExpression> buildTransactionFilter(
applyOpsBuilder.append("o.applyOps",
BSON("$type"
<< "array"));
- applyOpsBuilder.append("lsid", BSON("$exists" << true));
- applyOpsBuilder.append("txnNumber", BSON("$exists" << true));
applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true)));
applyOpsBuilder.append("o.partialTxn", BSON("$not" << BSON("$eq" << true)));
{
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 4fc0041cc0e..bb4ee0c9b2c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -111,14 +111,22 @@ constexpr StringData DocumentSourceChangeStream::kRegexAllDBs;
constexpr StringData DocumentSourceChangeStream::kRegexCmdColl;
void DocumentSourceChangeStream::checkValueType(const Value v,
- const StringData filedName,
+ const StringData fieldName,
BSONType expectedType) {
uassert(40532,
- str::stream() << "Entry field \"" << filedName << "\" should be "
+ str::stream() << "Entry field \"" << fieldName << "\" should be "
<< typeName(expectedType) << ", found: " << typeName(v.getType()),
(v.getType() == expectedType));
}
+void DocumentSourceChangeStream::checkValueTypeOrMissing(const Value v,
+ const StringData fieldName,
+ BSONType expectedType) {
+ if (!v.missing()) {
+ checkValueType(v, fieldName, expectedType);
+ }
+}
+
DocumentSourceChangeStream::ChangeStreamType DocumentSourceChangeStream::getChangeStreamType(
const NamespaceString& nss) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 60013e64444..ca6ee747c36 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -283,6 +283,11 @@ public:
static void checkValueType(Value v, StringData fieldName, BSONType expectedType);
/**
+ * Same as 'checkValueType', except it tolerates the field being missing.
+ */
+ static void checkValueTypeOrMissing(Value v, StringData fieldName, BSONType expectedType);
+
+ /**
* Extracts the resume token from the given spec. If a 'startAtOperationTime' is specified,
* returns the equivalent high-watermark token. This method should only ever be called on a spec
* where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 5d11196a08b..1a84eb5f2fd 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -313,10 +313,11 @@ public:
*/
std::vector<Document> getApplyOpsResults(const Document& applyOpsDoc,
const LogicalSessionFromClient& lsid,
- BSONObj spec = kDefaultSpec) {
+ BSONObj spec = kDefaultSpec,
+ bool hasTxnNumber = true) {
BSONObj applyOpsObj = applyOpsDoc.toBson();
- // Create an oplog entry and then glue on an lsid and txnNumber
+ // Create an oplog entry and then glue on an lsid and optionally a txnNumber
auto baseOplogEntry = makeOplogEntry(OpTypeEnum::kCommand,
nss.getCommandNS(),
applyOpsObj,
@@ -325,7 +326,9 @@ public:
BSONObj());
BSONObjBuilder builder(baseOplogEntry.getEntry().toBSON());
builder.append("lsid", lsid.toBSON());
- builder.append("txnNumber", 0LL);
+ if (hasTxnNumber) {
+ builder.append("txnNumber", 0LL);
+ }
BSONObj oplogEntry = builder.done();
// Create the stages and check that the documents produced matched those in the applyOps.
@@ -1425,26 +1428,63 @@ DEATH_TEST_F(ChangeStreamStageTest,
getApplyOpsResults(applyOpsDoc, lsid); // Should crash.
}
-TEST_F(ChangeStreamStageTest, TransformNonTransactionApplyOps) {
- BSONObj applyOpsObj = Document{{"applyOps",
- Value{std::vector<Document>{Document{
- {"op", "i"_sd},
- {"ns", nss.ns()},
- {"ui", testUuid()},
- {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}}
- .toBson();
+TEST_F(ChangeStreamStageTest, TransformNonTxnNumberApplyOps) {
+ Document applyOpsDoc =
+ Document{{"applyOps",
+ Value{std::vector<Document>{
+ Document{{"op", "i"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}};
- // Don't append lsid or txnNumber
+ LogicalSessionFromClient lsid = testLsid();
+ vector<Document> results =
+ getApplyOpsResults(applyOpsDoc, lsid, kDefaultSpec, false /* hasTxnNumber */);
- auto oplogEntry = makeOplogEntry(OpTypeEnum::kCommand,
- nss.getCommandNS(),
- applyOpsObj,
- testUuid(),
- boost::none, // fromMigrate
- BSONObj());
+ ASSERT_EQ(results.size(), 1u);
+ const auto nextDoc = results[0];
+ ASSERT(nextDoc.getField("txnNumber").missing());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kInsertOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123);
+ ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["x"].getString(), "hallo");
+ ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0);
+}
- checkTransformation(oplogEntry, boost::none);
+TEST_F(ChangeStreamStageTest, TransformNonTxnNumberBatchedDeleteApplyOps) {
+
+ Document applyOpsDoc{
+ {"applyOps",
+ Value{std::vector<Document>{
+ Document{{"op", "d"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 10}}}}},
+ Document{{"op", "d"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 11}}}}},
+ Document{{"op", "d"_sd},
+ {"ns", nss.ns()},
+ {"ui", testUuid()},
+ {"o", Value{Document{{"_id", 12}}}}},
+ }}},
+ };
+ LogicalSessionFromClient lsid = testLsid();
+ vector<Document> results =
+ getApplyOpsResults(applyOpsDoc, lsid, kDefaultSpec, false /* hasTxnNumber */);
+
+ ASSERT_EQ(results.size(), 3u);
+
+ int i = 0;
+ for (const auto& nextDoc : results) {
+ ASSERT(nextDoc.getField("txnNumber").missing());
+ ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(),
+ DSChangeStream::kDeleteOpType);
+ ASSERT_EQ(nextDoc[DSChangeStream::kDocumentKeyField]["_id"].getInt(), 10 + i++);
+ ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0);
+ }
}
TEST_F(ChangeStreamStageTest, TransformApplyOpsWithEntriesOnDifferentNs) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
index 40603a0b1d7..443769f234c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
@@ -222,13 +222,16 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::TransactionO
const Document& input,
const MatchExpression* expression)
: _mongoProcessInterface(mongoProcessInterface), _expression(expression) {
- Value lsidValue = input["lsid"];
- DocumentSourceChangeStream::checkValueType(lsidValue, "lsid", BSONType::Object);
- _lsid = lsidValue.getDocument();
+ // The lsid and txnNumber can be missing in case of batched writes.
+ Value lsidValue = input["lsid"];
+ DocumentSourceChangeStream::checkValueTypeOrMissing(lsidValue, "lsid", BSONType::Object);
+ _lsid = lsidValue.missing() ? boost::none : boost::optional<Document>(lsidValue.getDocument());
Value txnNumberValue = input["txnNumber"];
- DocumentSourceChangeStream::checkValueType(txnNumberValue, "txnNumber", BSONType::NumberLong);
- _txnNumber = txnNumberValue.getLong();
+ DocumentSourceChangeStream::checkValueTypeOrMissing(
+ txnNumberValue, "txnNumber", BSONType::NumberLong);
+ _txnNumber = txnNumberValue.missing() ? boost::none
+ : boost::optional<TxnNumber>(txnNumberValue.getLong());
// We want to parse the OpTime out of this document using the BSON OpTime parser. Instead of
// converting the entire Document back to BSON, we convert only the fields we need.
@@ -380,9 +383,9 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::_addRequired
newDoc.addField(DocumentSourceChangeStream::kApplyOpsTsField, Value(applyOpsTs()));
newDoc.addField(repl::OplogEntry::kTimestampFieldName, Value(_clusterTime));
- newDoc.addField(repl::OplogEntry::kSessionIdFieldName, Value(_lsid));
+ newDoc.addField(repl::OplogEntry::kSessionIdFieldName, _lsid ? Value(*_lsid) : Value());
newDoc.addField(repl::OplogEntry::kTxnNumberFieldName,
- Value(static_cast<long long>(_txnNumber)));
+ _txnNumber ? Value(static_cast<long long>(*_txnNumber)) : Value());
newDoc.addField(repl::OplogEntry::kWallClockTimeFieldName, Value(_wallTime));
return newDoc.freeze();
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
index ce7278b3b3e..a79be42c778 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
@@ -146,11 +146,11 @@ private:
return _clusterTime;
}
- Document lsid() const {
+ boost::optional<Document> lsid() const {
return _lsid;
}
- TxnNumber txnNumber() const {
+ boost::optional<TxnNumber> txnNumber() const {
return _txnNumber;
}
@@ -214,9 +214,10 @@ private:
Timestamp _clusterTime;
Date_t _wallTime;
- // Fields that were taken from the '_applyOps' oplog entry.
- Document _lsid;
- TxnNumber _txnNumber;
+ // Fields that were taken from the '_applyOps' oplog entry. They are optional because
+ // they may not be present on applyOps generated by the BatchedWriteContext.
+ boost::optional<Document> _lsid;
+ boost::optional<TxnNumber> _txnNumber;
// Used for traversing the oplog with TransactionHistoryInterface.
std::shared_ptr<MongoProcessInterface> _mongoProcessInterface;