diff options
author | Josef Ahmad <josef.ahmad@mongodb.com> | 2022-04-22 15:52:12 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-22 16:35:04 +0000 |
commit | 9e7b19e9047e003853bc3d0822962d6a2829773b (patch) | |
tree | 90983df676c5cd01cb3f4374829bf404d38adc3b | |
parent | 7c8c27650698e807ca8c885fb2a0e0d2996ecd19 (diff) | |
download | mongo-9e7b19e9047e003853bc3d0822962d6a2829773b.tar.gz |
SERVER-64972 Generate change stream events for batched deletes
13 files changed, 252 insertions, 59 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml index 61a06d66c39..bc7ae9883c0 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_batched_deletes_passthrough.yml @@ -5,10 +5,8 @@ selector: roots: - jstests/change_streams/**/*.js exclude_files: - # TODO: (SERVER-64972): add change stream support for batched deletes. - - jstests/change_streams/change_stream.js + # TODO: (SERVER-64506): add PIT pre/post-image support for batched deletes. - jstests/change_streams/lookup_pit_pre_and_post_image.js - - jstests/change_streams/show_raw_update_description.js exclude_with_any_tags: - assumes_standalone_mongod diff --git a/buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml b/buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml index 0d7391ad637..ed05731bd2f 100644 --- a/buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharding_batched_deletes_passthrough.yml @@ -11,7 +11,7 @@ selector: - jstests/sharding/api_params_nontransaction_unsharded.js # Expects DELETE stage - jstests/sharding/query/explain_cmd.js - # TODO: (SERVER-64972): add change stream support for batched deletes. + # TODO: (SERVER-64107): batched deleter should avoid generating events for orphaned documents. - jstests/sharding/change_stream_no_orphans.js exclude_with_any_tags: diff --git a/jstests/change_streams/apply_ops.js b/jstests/change_streams/apply_ops.js index c83dc7f7698..f123902b65a 100644 --- a/jstests/change_streams/apply_ops.js +++ b/jstests/change_streams/apply_ops.js @@ -72,13 +72,13 @@ withTxnAndAutoRetryOnMongos(session, () => { }, txnOptions); // Do applyOps on the collection that we care about. This is an "external" applyOps, though (not run -// as part of a transaction) so its entries should be skipped in the change stream. This checks that -// applyOps that don't have an 'lsid' and 'txnNumber' field do not get unwound. Skip if running in a +// as part of a transaction). This checks that although this applyOps doesn't have an 'lsid' and +// 'txnNumber', the field gets unwound and a change stream event is emitted. Skip if running in a // sharded passthrough, since the applyOps command does not exist on mongoS. if (!FixtureHelpers.isMongos(db)) { assert.commandWorked(db.runCommand({ applyOps: [ - {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "SHOULD NOT READ THIS"}}, + {op: "i", ns: coll.getFullName(), o: {_id: 3, a: "insert from atomic applyOps"}}, ] })); } @@ -87,7 +87,7 @@ if (!FixtureHelpers.isMongos(db)) { assert.commandWorked(db.runCommand({drop: coll.getName()})); // Define the set of changes expected for the single-collection case per the operations above. -const expectedChanges = [ +let expectedChanges = [ { documentKey: {_id: 1}, fullDocument: {_id: 1, a: 0}, @@ -118,13 +118,22 @@ const expectedChanges = [ operationType: "delete", lsid: session.getSessionId(), txnNumber: session.getTxnNumber_forTesting(), - }, - { - operationType: "drop", - ns: {db: db.getName(), coll: coll.getName()}, - }, + } ]; +if (!FixtureHelpers.isMongos(db)) { + expectedChanges.push({ + documentKey: {_id: 3}, + fullDocument: {_id: 3, a: "insert from atomic applyOps"}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert", + }); +} +expectedChanges.push({ + operationType: "drop", + ns: {db: db.getName(), coll: coll.getName()}, +}); + // If we are running in a sharded passthrough, then this may have been a multi-shard transaction. // Change streams will interleave the txn events from across the shards in (clusterTime, txnOpIndex) // order, and so may not reflect the ordering of writes in the test. We thus verify that exactly the diff --git a/jstests/noPassthrough/change_stream_unwind_batched_writes.js b/jstests/noPassthrough/change_stream_unwind_batched_writes.js new file mode 100644 index 00000000000..f84406c4a78 --- /dev/null +++ b/jstests/noPassthrough/change_stream_unwind_batched_writes.js @@ -0,0 +1,118 @@ +/** + * Verifies change streams operation for batched writes. + * + * @tags: [ + * # Running as a replica set requires journaling. + * requires_journaling, + * requires_majority_read_concern, + * uses_change_streams, + * ] + */ +(function() { +"use strict"; + +// '__internalBatchedDeletesTesting.Collection0' is a special, hardcoded namespace that batches +// multi-doc deletes if the 'internalBatchUserMultiDeletesForTest' server parameter is set. +// TODO (SERVER-63044): remove this special handling. +const dbName = "__internalBatchedDeletesTesting"; +const collName = "Collection0"; + +/** + * Asserts that the expected operation type and documentKey are found on the change stream + * cursor. Returns the change stream document. + */ +function assertWriteVisible(cursor, operationType, documentKey) { + assert.soon(() => cursor.hasNext()); + const changeDoc = cursor.next(); + assert.eq(operationType, changeDoc.operationType, changeDoc); + assert.eq(documentKey, changeDoc.documentKey, changeDoc); + // Change stream events for batched writes do not include lsid and txnNumber. + assert(!changeDoc.hasOwnProperty('lsid')); + assert(!changeDoc.hasOwnProperty('txnNumber')); + return changeDoc; +} + +/** + * Asserts that the expected operation type and documentKey are found on the change stream + * cursor. Pushes the corresponding resume token and change stream document to an array. + */ +function assertWriteVisibleWithCapture(cursor, operationType, documentKey, changeList) { + const changeDoc = assertWriteVisible(cursor, operationType, documentKey); + changeList.push(changeDoc); +} + +/** + * Asserts that there are no changes waiting on the change stream cursor. + */ +function assertNoChanges(cursor) { + assert(!cursor.hasNext(), () => { + return "Unexpected change set: " + tojson(cursor.toArray()); + }); +} + +function runTest(conn) { + const db = conn.getDB(dbName); + const coll = db.getCollection(collName); + + const docsPerBatch = 3; + const totalNumDocs = 8; + let changeList = []; + + // Enable batched deletes. For consistent results, disable any batch targeting except for + // 'batchedDeletesTargetBatchDocs'. + assert.commandWorked( + db.adminCommand({setParameter: 1, internalBatchUserMultiDeletesForTest: 1})); + assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetBatchTimeMS: 0})); + assert.commandWorked(db.adminCommand({setParameter: 1, batchedDeletesTargetStagedDocBytes: 0})); + assert.commandWorked( + db.adminCommand({setParameter: 1, batchedDeletesTargetBatchDocs: docsPerBatch})); + + // Populate the collection, then open a change stream, then mass-delete the collection. + assert.commandWorked( + coll.insertMany([...Array(totalNumDocs).keys()].map(x => ({_id: x, txt: "a" + x})))); + const changeStreamCursor = coll.watch(); + const serverStatusBatchesBefore = db.serverStatus()['batchedDeletes']['batches']; + const serverStatusDocsBefore = db.serverStatus()['batchedDeletes']['docs']; + assert.commandWorked(coll.deleteMany({_id: {$gte: 0}})); + assert.eq(0, coll.find().itcount()); + const serverStatusBatchesAfter = db.serverStatus()['batchedDeletes']['batches']; + const serverStatusDocsAfter = db.serverStatus()['batchedDeletes']['docs']; + assert.eq(serverStatusBatchesAfter, + serverStatusBatchesBefore + Math.ceil(totalNumDocs / docsPerBatch)); + assert.eq(serverStatusDocsAfter, serverStatusDocsBefore + totalNumDocs); + + // Verify the change stream emits events for the batched deletion, and capture the events so we + // can test resumability later. + for (let docKey = 0; docKey < totalNumDocs; docKey++) { + assertWriteVisibleWithCapture(changeStreamCursor, "delete", {_id: docKey}, changeList); + } + + assertNoChanges(changeStreamCursor); + changeStreamCursor.close(); + + // Test that change stream resume returns the expected set of documents at each point + // captured by this test. + for (let i = 0; i < changeList.length; ++i) { + const resumeCursor = coll.watch([], {startAfter: changeList[i]._id}); + + for (let x = (i + 1); x < changeList.length; ++x) { + const expectedChangeDoc = changeList[x]; + assertWriteVisible( + resumeCursor, expectedChangeDoc.operationType, expectedChangeDoc.documentKey); + } + + assertNoChanges(resumeCursor); + resumeCursor.close(); + } + + assert.commandWorked(db.dropDatabase()); +} + +const rst = new ReplSetTest({nodes: 1}); +rst.startSet(); +rst.initiate(); + +runTest(rst.getPrimary()); + +rst.stopSet(); +})(); diff --git a/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js b/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js index ee1a4211cce..60ea0fbaeb0 100644 --- a/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js +++ b/jstests/replsets/apply_ops_inserts_do_not_include_fromMigrate_field.js @@ -1,8 +1,13 @@ /** * Tests that insert oplog entries created by applyOps commands do not contain the 'fromMigrate' - * field. Additionally tests that non-atomic applyOps inserts should be returned by changeStreams. + * field. Additionally tests inserts originating from applyOps commands are returned by + * changeStreams. * - * @tags: [uses_change_streams] + * @tags: [ + * uses_change_streams, + * # Change streams emit events for applyOps without lsid and txnNumber as of SERVER-64972. + * multiversion_incompatible, + * ] */ (function() { 'use strict'; @@ -53,6 +58,7 @@ assert.commandWorked(primaryDB.runCommand({ })); // Test atomic applyOps inserts. +// TODO (SERVER-33182): Remove the atomic applyOps testing once atomic applyOps are removed. assert.commandWorked( primaryDB.runCommand({applyOps: [{op: "i", ns: nss(dbName, collName), o: {_id: 4}}]})); assert.commandWorked(primaryDB.runCommand({ @@ -80,20 +86,25 @@ nonAtomicResults.forEach(function(op) { assert(!op.hasOwnProperty("fromMigrate"), nonAtomicResults); }); -// TODO (SERVER-33182): Remove the atomic applyOps testing once atomic applyOps are removed. -// Atomic applyOps inserts are not expected to be picked up by changeStreams. -primaryCST.assertNoChange(primaryChangeStream); -secondaryCST.assertNoChange(secondaryChangeStream); +// Atomic applyOps inserts are expected to be picked up by changeStreams. // We expect the operations from an atomic applyOps command to be nested in an applyOps oplog entry. const atomicResults = oplog.find({"o.applyOps": {$exists: true}}).toArray(); assert.eq(atomicResults.length, 2, atomicResults); for (let i = 0; i < atomicResults.length; i++) { let ops = atomicResults[i].o.applyOps; ops.forEach(function(op) { + const primaryChange = primaryCST.getOneChange(primaryChangeStream); + assert.eq(primaryChange.documentKey._id, expectedCount, primaryChange); + const secondaryChange = secondaryCST.getOneChange(secondaryChangeStream); + assert.eq(secondaryChange.documentKey._id, expectedCount, secondaryChange); assert.eq(op.o._id, expectedCount++, atomicResults); assert(!op.hasOwnProperty("fromMigrate"), atomicResults); }); } + +primaryCST.assertNoChange(primaryChangeStream); +secondaryCST.assertNoChange(secondaryChangeStream); + assert.eq(7, expectedCount); rst.stopSet(); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 14139e30d9d..51215543df6 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -1666,7 +1666,9 @@ OpTimeBundle logApplyOps(OperationContext* opCtx, oplogEntry->setOpType(repl::OpTypeEnum::kCommand); oplogEntry->setNss({"admin", "$cmd"}); - oplogEntry->setSessionId(opCtx->getLogicalSessionId()); + // Batched writes (that is, WUOWs with 'groupOplogEntries') are not associated with a txnNumber, + // so do not emit an lsid either. + oplogEntry->setSessionId(opCtx->getTxnNumber() ? opCtx->getLogicalSessionId() : boost::none); oplogEntry->setTxnNumber(opCtx->getTxnNumber()); if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) { oplogEntry->getOperationSessionInfo().setTxnRetryCounter(*txnRetryCounter); diff --git a/src/mongo/db/pipeline/change_stream_event_transform.cpp b/src/mongo/db/pipeline/change_stream_event_transform.cpp index 644ced702c7..7737821eb41 100644 --- a/src/mongo/db/pipeline/change_stream_event_transform.cpp +++ b/src/mongo/db/pipeline/change_stream_event_transform.cpp @@ -43,6 +43,7 @@ namespace mongo { namespace { constexpr auto checkValueType = &DocumentSourceChangeStream::checkValueType; +constexpr auto checkValueTypeOrMissing = &DocumentSourceChangeStream::checkValueTypeOrMissing; Document copyDocExceptFields(const Document& source, const std::set<StringData>& fieldNames) { MutableDocument doc(source); @@ -385,13 +386,12 @@ Document ChangeStreamDefaultEventTransformation::applyTransformation(const Docum // Add some additional fields only relevant to transactions. if (!txnOpIndex.missing()) { + // The lsid and txnNumber may be missing if this is a batched write. auto lsid = input[DocumentSourceChangeStream::kLsidField]; - checkValueType(lsid, DocumentSourceChangeStream::kLsidField, BSONType::Object); - + checkValueTypeOrMissing(lsid, DocumentSourceChangeStream::kLsidField, BSONType::Object); auto txnNumber = input[DocumentSourceChangeStream::kTxnNumberField]; - checkValueType( + checkValueTypeOrMissing( txnNumber, DocumentSourceChangeStream::kTxnNumberField, BSONType::NumberLong); - doc.addField(DocumentSourceChangeStream::kTxnNumberField, txnNumber); doc.addField(DocumentSourceChangeStream::kLsidField, lsid); } diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp index 106c9ea9eb6..f7d744d5c9d 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp @@ -236,8 +236,6 @@ std::unique_ptr<MatchExpression> buildTransactionFilter( applyOpsBuilder.append("o.applyOps", BSON("$type" << "array")); - applyOpsBuilder.append("lsid", BSON("$exists" << true)); - applyOpsBuilder.append("txnNumber", BSON("$exists" << true)); applyOpsBuilder.append("o.prepare", BSON("$not" << BSON("$eq" << true))); applyOpsBuilder.append("o.partialTxn", BSON("$not" << BSON("$eq" << true))); { diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 4fc0041cc0e..bb4ee0c9b2c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -111,14 +111,22 @@ constexpr StringData DocumentSourceChangeStream::kRegexAllDBs; constexpr StringData DocumentSourceChangeStream::kRegexCmdColl; void DocumentSourceChangeStream::checkValueType(const Value v, - const StringData filedName, + const StringData fieldName, BSONType expectedType) { uassert(40532, - str::stream() << "Entry field \"" << filedName << "\" should be " + str::stream() << "Entry field \"" << fieldName << "\" should be " << typeName(expectedType) << ", found: " << typeName(v.getType()), (v.getType() == expectedType)); } +void DocumentSourceChangeStream::checkValueTypeOrMissing(const Value v, + const StringData fieldName, + BSONType expectedType) { + if (!v.missing()) { + checkValueType(v, fieldName, expectedType); + } +} + DocumentSourceChangeStream::ChangeStreamType DocumentSourceChangeStream::getChangeStreamType( const NamespaceString& nss) { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 60013e64444..ca6ee747c36 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -283,6 +283,11 @@ public: static void checkValueType(Value v, StringData fieldName, BSONType expectedType); /** + * Same as 'checkValueType', except it tolerates the field being missing. + */ + static void checkValueTypeOrMissing(Value v, StringData fieldName, BSONType expectedType); + + /** * Extracts the resume token from the given spec. If a 'startAtOperationTime' is specified, * returns the equivalent high-watermark token. This method should only ever be called on a spec * where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 5d11196a08b..1a84eb5f2fd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -313,10 +313,11 @@ public: */ std::vector<Document> getApplyOpsResults(const Document& applyOpsDoc, const LogicalSessionFromClient& lsid, - BSONObj spec = kDefaultSpec) { + BSONObj spec = kDefaultSpec, + bool hasTxnNumber = true) { BSONObj applyOpsObj = applyOpsDoc.toBson(); - // Create an oplog entry and then glue on an lsid and txnNumber + // Create an oplog entry and then glue on an lsid and optionally a txnNumber auto baseOplogEntry = makeOplogEntry(OpTypeEnum::kCommand, nss.getCommandNS(), applyOpsObj, @@ -325,7 +326,9 @@ public: BSONObj()); BSONObjBuilder builder(baseOplogEntry.getEntry().toBSON()); builder.append("lsid", lsid.toBSON()); - builder.append("txnNumber", 0LL); + if (hasTxnNumber) { + builder.append("txnNumber", 0LL); + } BSONObj oplogEntry = builder.done(); // Create the stages and check that the documents produced matched those in the applyOps. @@ -1425,26 +1428,63 @@ DEATH_TEST_F(ChangeStreamStageTest, getApplyOpsResults(applyOpsDoc, lsid); // Should crash. } -TEST_F(ChangeStreamStageTest, TransformNonTransactionApplyOps) { - BSONObj applyOpsObj = Document{{"applyOps", - Value{std::vector<Document>{Document{ - {"op", "i"_sd}, - {"ns", nss.ns()}, - {"ui", testUuid()}, - {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}} - .toBson(); +TEST_F(ChangeStreamStageTest, TransformNonTxnNumberApplyOps) { + Document applyOpsDoc = + Document{{"applyOps", + Value{std::vector<Document>{ + Document{{"op", "i"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 123}, {"x", "hallo"_sd}}}}}}}}}; - // Don't append lsid or txnNumber + LogicalSessionFromClient lsid = testLsid(); + vector<Document> results = + getApplyOpsResults(applyOpsDoc, lsid, kDefaultSpec, false /* hasTxnNumber */); - auto oplogEntry = makeOplogEntry(OpTypeEnum::kCommand, - nss.getCommandNS(), - applyOpsObj, - testUuid(), - boost::none, // fromMigrate - BSONObj()); + ASSERT_EQ(results.size(), 1u); + const auto nextDoc = results[0]; + ASSERT(nextDoc.getField("txnNumber").missing()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kInsertOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["_id"].getInt(), 123); + ASSERT_EQ(nextDoc[DSChangeStream::kFullDocumentField]["x"].getString(), "hallo"); + ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0); +} - checkTransformation(oplogEntry, boost::none); +TEST_F(ChangeStreamStageTest, TransformNonTxnNumberBatchedDeleteApplyOps) { + + Document applyOpsDoc{ + {"applyOps", + Value{std::vector<Document>{ + Document{{"op", "d"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 10}}}}}, + Document{{"op", "d"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 11}}}}}, + Document{{"op", "d"_sd}, + {"ns", nss.ns()}, + {"ui", testUuid()}, + {"o", Value{Document{{"_id", 12}}}}}, + }}}, + }; + LogicalSessionFromClient lsid = testLsid(); + vector<Document> results = + getApplyOpsResults(applyOpsDoc, lsid, kDefaultSpec, false /* hasTxnNumber */); + + ASSERT_EQ(results.size(), 3u); + + int i = 0; + for (const auto& nextDoc : results) { + ASSERT(nextDoc.getField("txnNumber").missing()); + ASSERT_EQ(nextDoc[DSChangeStream::kOperationTypeField].getString(), + DSChangeStream::kDeleteOpType); + ASSERT_EQ(nextDoc[DSChangeStream::kDocumentKeyField]["_id"].getInt(), 10 + i++); + ASSERT_EQ(nextDoc["lsid"].getDocument().toBson().woCompare(lsid.toBSON()), 0); + } } TEST_F(ChangeStreamStageTest, TransformApplyOpsWithEntriesOnDifferentNs) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp index 40603a0b1d7..443769f234c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp @@ -222,13 +222,16 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::TransactionO const Document& input, const MatchExpression* expression) : _mongoProcessInterface(mongoProcessInterface), _expression(expression) { - Value lsidValue = input["lsid"]; - DocumentSourceChangeStream::checkValueType(lsidValue, "lsid", BSONType::Object); - _lsid = lsidValue.getDocument(); + // The lsid and txnNumber can be missing in case of batched writes. + Value lsidValue = input["lsid"]; + DocumentSourceChangeStream::checkValueTypeOrMissing(lsidValue, "lsid", BSONType::Object); + _lsid = lsidValue.missing() ? boost::none : boost::optional<Document>(lsidValue.getDocument()); Value txnNumberValue = input["txnNumber"]; - DocumentSourceChangeStream::checkValueType(txnNumberValue, "txnNumber", BSONType::NumberLong); - _txnNumber = txnNumberValue.getLong(); + DocumentSourceChangeStream::checkValueTypeOrMissing( + txnNumberValue, "txnNumber", BSONType::NumberLong); + _txnNumber = txnNumberValue.missing() ? boost::none + : boost::optional<TxnNumber>(txnNumberValue.getLong()); // We want to parse the OpTime out of this document using the BSON OpTime parser. Instead of // converting the entire Document back to BSON, we convert only the fields we need. @@ -380,9 +383,9 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::_addRequired newDoc.addField(DocumentSourceChangeStream::kApplyOpsTsField, Value(applyOpsTs())); newDoc.addField(repl::OplogEntry::kTimestampFieldName, Value(_clusterTime)); - newDoc.addField(repl::OplogEntry::kSessionIdFieldName, Value(_lsid)); + newDoc.addField(repl::OplogEntry::kSessionIdFieldName, _lsid ? Value(*_lsid) : Value()); newDoc.addField(repl::OplogEntry::kTxnNumberFieldName, - Value(static_cast<long long>(_txnNumber))); + _txnNumber ? Value(static_cast<long long>(*_txnNumber)) : Value()); newDoc.addField(repl::OplogEntry::kWallClockTimeFieldName, Value(_wallTime)); return newDoc.freeze(); diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h index ce7278b3b3e..a79be42c778 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h @@ -146,11 +146,11 @@ private: return _clusterTime; } - Document lsid() const { + boost::optional<Document> lsid() const { return _lsid; } - TxnNumber txnNumber() const { + boost::optional<TxnNumber> txnNumber() const { return _txnNumber; } @@ -214,9 +214,10 @@ private: Timestamp _clusterTime; Date_t _wallTime; - // Fields that were taken from the '_applyOps' oplog entry. - Document _lsid; - TxnNumber _txnNumber; + // Fields that were taken from the '_applyOps' oplog entry. They are optional because + // they may not be present on applyOps generated by the BatchedWriteContext. + boost::optional<Document> _lsid; + boost::optional<TxnNumber> _txnNumber; // Used for traversing the oplog with TransactionHistoryInterface. std::shared_ptr<MongoProcessInterface> _mongoProcessInterface; |