diff options
author | Denis Grebennicov <denis.grebennicov@mongodb.com> | 2021-10-04 16:54:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-04 17:58:49 +0000 |
commit | c05c56ed7f798ec5725b3883185c4ddebab1b19f (patch) | |
tree | db0b4f825b3d673596240e8bb74122c630101cac | |
parent | 0b1b02acbc21bae6d8aa6468f707f80e70b0ddd6 (diff) | |
download | mongo-c05c56ed7f798ec5725b3883185c4ddebab1b19f.tar.gz |
SERVER-58690 Implement loading of post-images in a change stream
12 files changed, 400 insertions, 159 deletions
diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image.js b/jstests/change_streams/lookup_pit_pre_and_post_image.js index 3e218854a1b..49fa4bafa49 100644 --- a/jstests/change_streams/lookup_pit_pre_and_post_image.js +++ b/jstests/change_streams/lookup_pit_pre_and_post_image.js @@ -1,5 +1,5 @@ -// Tests that pre-images are stored in the pre-images collection on updates in collections with -// 'changeStreamPreAndPostImages' set to true. +// Tests that the point-in-time pre- and post-images are loaded correctly in $changeStream running +// with different arguments for collections with 'changeStreamPreAndPostImages' set to true. // @tags: [ // assumes_against_mongod_not_mongos, // change_stream_does_not_expect_txns, @@ -12,11 +12,11 @@ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateC load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled. const testDB = db.getSiblingDB(jsTestName()); -const localDB = db.getSiblingDB("local"); const collName = "test"; -const coll = assertDropAndRecreateCollection(testDB, collName); if (!isChangeStreamPreAndPostImagesEnabled(db)) { + const coll = assertDropAndRecreateCollection(testDB, collName); + // If feature flag is off, creating changeStream with new fullDocument arguments should throw. assert.throwsWithCode(() => coll.watch([], {fullDocument: 'whenAvailable'}), ErrorCodes.BadValue); @@ -27,8 +27,6 @@ if (!isChangeStreamPreAndPostImagesEnabled(db)) { return; } -const collUUID = testDB.getCollectionInfos({name: collName})[0].info.uuid; -const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages"); const originalDoc = { _id: 1, x: 1 @@ -41,96 +39,202 @@ const updatedDoc2 = { _id: 1, x: 5 }; +const replaceDoc = { + _id: 1, + z: 1 +}; -function assertValidPreImage(preImage) { - const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay. - assert.eq(preImage._id.nsUUID, collUUID); - assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()), - MAX_TIME_DELTA_SECONDS); - assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000), - MAX_TIME_DELTA_SECONDS); - assert.eq(preImage._id.applyOpsIndex, 0); -} - -// Open the change streams with new fullDocument parameter set. -assert.doesNotThrow(() => coll.watch([], {fullDocument: 'whenAvailable'})); -assert.doesNotThrow(() => coll.watch([], {fullDocument: 'required'})); - -// Open the change streams with fullDocumentBeforeChange parameter set. -const changeStreamCursorWhenAvailable = coll.watch([], {fullDocumentBeforeChange: 'whenAvailable'}); -let changeStreamCursorRequired = coll.watch([], {fullDocumentBeforeChange: 'required'}); - -// Perform an insert. -assert.commandWorked(coll.insert(originalDoc)); -assert.soon(() => changeStreamCursorWhenAvailable.hasNext()); -assert.soon(() => changeStreamCursorRequired.hasNext()); -assert.eq(changeStreamCursorWhenAvailable.next().operationType, 'insert'); -assert.eq(changeStreamCursorRequired.next().operationType, 'insert'); - -// Pre-images collection should remain empty, as pre-images for insert operations can be found in -// the oplog. -assert.eq(preImagesColl.find().count(), 0); - -// Perform an update with 'damages'. -assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); - -// Since changeStreamPreAndPostImages is not enabled, pre-images collection must be empty. -assert.eq(preImagesColl.find().count(), 0); - -// Change stream with { fullDocumentBeforeChange: 'whenAvailable' } should return null as pre-image. -assert.soon(() => changeStreamCursorWhenAvailable.hasNext()); -const doc = changeStreamCursorWhenAvailable.next(); -assert(doc.hasOwnProperty("fullDocumentBeforeChange")); -assert.isnull(doc.fullDocumentBeforeChange); - -// Change stream with { fullDocumentBeforeChange: 'required' } should fail as pre-image is not -// available. -try { - assert.soon(() => changeStreamCursorRequired.hasNext()); - assert(false, `Unexpected result from cursor: ${tojson(changeStreamCursorRequired.next())}`); -} catch (error) { - assert.eq(error.code, - ErrorCodes.ChangeStreamHistoryLost, - `Caught unexpected error: ${tojson(error)}`); +// Tests the change stream point-in-time pre-/post-images behaviour with different change stream +// options. +function preAndPostImageTest({ + changeStreamOptions = {}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled = {}, + expectedOnUpdateImages = {}, + expectedOnReplaceImages = {} +} = {}) { + // Confirms that the change event document does not contain any internal-only fields. + function assertChangeStreamInternalFieldsNotPresent(changeStreamDoc) { + assert(!changeStreamDoc.hasOwnProperty("preImageId"), changeStreamDoc); + assert(!changeStreamDoc.hasOwnProperty("updateModification"), changeStreamDoc); + + if (!changeStreamOptions.hasOwnProperty("fullDocumentBeforeChange")) { + assert(!changeStreamDoc.hasOwnProperty("fullDocumentBeforeChange"), changeStreamDoc); + } + + if (!changeStreamOptions.hasOwnProperty("fullDocument") && + changeStreamDoc.operationType == "update") { + assert(!changeStreamDoc.hasOwnProperty("fullDocument"), changeStreamDoc); + } + } + + const coll = assertDropAndRecreateCollection(testDB, collName); + + // Open a change stream with the specified test options. + let changeStreamCursor = coll.watch([], Object.assign({}, changeStreamOptions)); + let changeStreamDoc = null; + + // Perform an insert. + assert.commandWorked(coll.insert(originalDoc)); + assert.soon(() => changeStreamCursor.hasNext()); + changeStreamDoc = changeStreamCursor.next(); + assert.eq(changeStreamDoc.operationType, 'insert'); + assertChangeStreamInternalFieldsNotPresent(changeStreamDoc); + + // Perform an update modification. + assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); + + // Change stream should throw an exception while trying to fetch the next document if + // pre-/post-image is required. + const shouldThrow = changeStreamOptions.fullDocument === 'required' || + changeStreamOptions.fullDocumentBeforeChange === 'required'; + if (shouldThrow) { + try { + assert.soon(() => changeStreamCursor.hasNext()); + assert(false, `Unexpected result from cursor: ${tojson(changeStreamCursor.next())}`); + } catch (error) { + assert.eq(error.code, + ErrorCodes.NoMatchingDocument, + `Caught unexpected error: ${tojson(error)}`); + } + + // Reopen the failed change stream. + changeStreamCursor = coll.watch([], Object.assign({}, changeStreamOptions)); + } else { + assert.soon(() => changeStreamCursor.hasNext()); + changeStreamDoc = changeStreamCursor.next(); + assert.eq(changeStreamDoc.fullDocumentBeforeChange, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled.preImage); + assert.eq(changeStreamDoc.fullDocument, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled.postImage); + assertChangeStreamInternalFieldsNotPresent(changeStreamDoc); + } + + // Enable changeStreamPreAndPostImages for pre-images recording. + assert.commandWorked( + testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true})); + + // Perform an update modification. + assert.commandWorked(coll.update(updatedDoc, {$inc: {x: 2}})); + + // The next change stream event should contain the expected pre- and post-images. + assert.soon(() => changeStreamCursor.hasNext()); + changeStreamDoc = changeStreamCursor.next(); + assert.eq(changeStreamDoc.fullDocumentBeforeChange, expectedOnUpdateImages.preImage); + assert.eq(changeStreamDoc.fullDocument, expectedOnUpdateImages.postImage); + assertChangeStreamInternalFieldsNotPresent(changeStreamDoc); + + // Perform a full-document replacement. + assert.commandWorked(coll.update(updatedDoc2, replaceDoc)); + + // The next change stream event should contain the expected pre- and post-images. + assert.soon(() => changeStreamCursor.hasNext()); + changeStreamDoc = changeStreamCursor.next(); + assert.eq(changeStreamDoc.fullDocumentBeforeChange, expectedOnReplaceImages.preImage); + assert.eq(changeStreamDoc.fullDocument, expectedOnReplaceImages.postImage); + assert.eq(changeStreamDoc.operationType, "replace"); + assertChangeStreamInternalFieldsNotPresent(changeStreamDoc); } -// Enable changeStreamPreAndPostImages for pre-images recording. -assert.commandWorked(testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true})); - -// Reopen the failed change stream. -changeStreamCursorRequired = coll.watch([], {fullDocumentBeforeChange: 'required'}); - -// Perform an update with 'damages'. -assert.commandWorked(coll.update(updatedDoc, {$inc: {x: 2}})); - -// Pre-images collection should contain one document with the 'updatedDoc' pre-image. -assert.eq(preImagesColl.find().count({"preImage": updatedDoc}), 1); -let preImageDoc = preImagesColl.find({"preImage": updatedDoc}).next(); -assertValidPreImage(preImageDoc); - -// Change stream should contain the pre-image. -assert.soon(() => changeStreamCursorWhenAvailable.hasNext()); -assert.soon(() => changeStreamCursorRequired.hasNext()); -assert.eq(changeStreamCursorWhenAvailable.next().fullDocumentBeforeChange, updatedDoc); -assert.eq(changeStreamCursorRequired.next().fullDocumentBeforeChange, updatedDoc); - -// Perform an update (replace). -assert.commandWorked(coll.update(updatedDoc2, {z: 1})); - -// Pre-Images collection should contain a new document with the 'updatedDoc2' pre-image. -assert.eq(preImagesColl.find({"preImage": updatedDoc2}).count(), 1); -preImageDoc = preImagesColl.find({"preImage": updatedDoc2}).next(); -assertValidPreImage(preImageDoc); - -// Change stream should contain the pre-image. -assert.soon(() => changeStreamCursorWhenAvailable.hasNext()); -assert.soon(() => changeStreamCursorRequired.hasNext()); - -let changeStreamDoc = changeStreamCursorWhenAvailable.next(); -assert.eq(changeStreamDoc.fullDocumentBeforeChange, updatedDoc2); -assert.eq(changeStreamDoc.operationType, "replace"); - -changeStreamDoc = changeStreamCursorRequired.next(); -assert.eq(changeStreamDoc.fullDocumentBeforeChange, updatedDoc2); -assert.eq(changeStreamDoc.operationType, "replace"); +preAndPostImageTest({ + expectedOnReplaceImages: { + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: { + preImage: null, + }, + expectedOnUpdateImages: { + preImage: updatedDoc, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocument: 'whenAvailable'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: { + postImage: null, + }, + expectedOnUpdateImages: { + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'whenAvailable'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: { + preImage: null, + postImage: null, + }, + expectedOnUpdateImages: { + preImage: updatedDoc, + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'required'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + preImage: updatedDoc, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocument: 'required'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'required', fullDocument: 'required'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + preImage: updatedDoc, + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'required'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + preImage: updatedDoc, + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'required', fullDocument: 'whenAvailable'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + preImage: updatedDoc, + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); }()); diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js index 72cac8cd0a2..120c6de5671 100644 --- a/jstests/change_streams/lookup_pre_image.js +++ b/jstests/change_streams/lookup_pre_image.js @@ -122,7 +122,7 @@ latestChange.fullDocumentBeforeChange = null; assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); // ... but the "required" cursor throws an exception. assert.throwsWithCode(() => cst.getOneChange(csPreImageRequiredCursor), - [ErrorCodes.ChangeStreamHistoryLost, 51770]); + [ErrorCodes.NoMatchingDocument, 51770]); // Test pre-image lookup for an op-style update operation. assert.commandWorked(coll.update({_id: "y"}, {$set: {foo: "baz"}})); diff --git a/jstests/change_streams/write_pit_preimage.js b/jstests/change_streams/write_pit_preimage.js index 5cb7090e300..f81cb18c1e0 100644 --- a/jstests/change_streams/write_pit_preimage.js +++ b/jstests/change_streams/write_pit_preimage.js @@ -16,11 +16,8 @@ load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPo const testDB = db.getSiblingDB(jsTestName()); const localDB = db.getSiblingDB("local"); const collName = "test"; -const coll = - assertDropAndRecreateCollection(testDB, collName, {changeStreamPreAndPostImages: true}); -const collInfos = testDB.getCollectionInfos({name: collName}); -assert.eq(collInfos.length, 1); -const collUUID = collInfos[0].info.uuid; +const coll = assertDropAndRecreateCollection(testDB, collName); +const collUUID = testDB.getCollectionInfos({name: collName})[0].info.uuid; const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages"); const originalDoc = { _id: 1, @@ -31,37 +28,50 @@ const updatedDoc = { x: 3 }; -function assertValidPreImage(preImage) { - const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay. +// Validates the contents of the pre-image collection entry. +function assertValidChangeStreamPreImageDocument(preImage) { + const oplogEntryCursor = localDB.oplog.rs.find({ts: preImage._id.ts}); + assert(oplogEntryCursor.hasNext()); + const oplogEntry = oplogEntryCursor.next(); + assert.eq(oplogEntry.op, "u", oplogEntry); + assert.eq(preImage._id.nsUUID, oplogEntry.ui); assert.eq(preImage._id.nsUUID, collUUID); - assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()), - MAX_TIME_DELTA_SECONDS); - assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000), - MAX_TIME_DELTA_SECONDS); assert.eq(preImage._id.applyOpsIndex, 0); + assert.eq(preImage.operationTime, oplogEntry.wall, oplogEntry); + assert.eq(preImage.preImage._id, oplogEntry.o2._id, oplogEntry); } +// Perform an insert, an update modification and a delete. +assert.commandWorked(coll.insert(originalDoc)); +assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); +assert.commandWorked(coll.remove(updatedDoc)); + +// Since changeStreamPreAndPostImages is not enabled, pre-images collection must be empty. +assert.eq(preImagesColl.count(), 0); + +// Enable changeStreamPreAndPostImages for pre-images recording. +assert.commandWorked(testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true})); + // Perform an insert. assert.commandWorked(coll.insert(originalDoc)); assert.eq(coll.find().count(), 1); -// Pre-images collection should remain empty, as pre-images for insert operations can be found in -// the oplog. +// Pre-images collection should remain empty, as insert operations do not have pre-images. assert.eq(preImagesColl.find().count(), 0); -// Perform an update with 'damages'. +// Perform an update modification. assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); // Pre-images collection should contain one document with the 'originalDoc' pre-image. -let preimages = preImagesColl.find({"preImage": originalDoc}).toArray(); -assert.eq(preimages.length, 1); -assertValidPreImage(preimages[0]); +let preImages = preImagesColl.find({"preImage": originalDoc}).toArray(); +assert.eq(preImages.length, 1); +assertValidChangeStreamPreImageDocument(preImages[0]); -// Perform an update (replace). +// Perform a full-document replacement. assert.commandWorked(coll.update(updatedDoc, {z: 1})); // Pre-images collection should contain a new document with the 'updatedDoc' pre-image. -preimages = preImagesColl.find({"preImage": updatedDoc}).toArray(); -assert.eq(preimages.length, 1); -assertValidPreImage(preimages[0]); +preImages = preImagesColl.find({"preImage": updatedDoc}).toArray(); +assert.eq(preImages.length, 1); +assertValidChangeStreamPreImageDocument(preImages[0]); }()); diff --git a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js index 010e69365cb..847642da319 100644 --- a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js +++ b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js @@ -60,7 +60,7 @@ assert.throwsWithCode(function() { return assert.soon(() => wholeDBStream.hasNext() && wholeDBStream.next().documentKey._id === "last_change_sentinel"); -}, [ErrorCodes.ChangeStreamHistoryLost, 51770]); +}, [ErrorCodes.NoMatchingDocument, 51770]); // Confirm that attempting to open a whole-cluster stream on with mode "required" fails. assert.throwsWithCode(function() { @@ -72,7 +72,7 @@ assert.throwsWithCode(function() { return assert.soon(() => wholeClusterStream.hasNext() && wholeClusterStream.next().documentKey._id == "last_change_sentinel"); -}, [ErrorCodes.ChangeStreamHistoryLost, 51770]); +}, [ErrorCodes.NoMatchingDocument, 51770]); // However, if we open a whole-db or whole-cluster stream that filters for only the namespace with // pre-images, then the cursor can proceed. This is because the $match gets moved ahead of the diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 5e47640079a..4435a99896c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -380,8 +380,10 @@ env.Library( 'document_source_change_stream_unwind_transaction.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/ops/write_ops_parsers', '$BUILD_DIR/mongo/db/pipeline/pipeline', '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers', + '$BUILD_DIR/mongo/db/update/update_driver', '$BUILD_DIR/mongo/s/query/router_exec_stage', 'change_stream_preimage', ], diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index b8413a761e2..b58c31108d6 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -157,6 +157,7 @@ public: static constexpr StringData kLsidField = "lsid"_sd; static constexpr StringData kTxnOpIndexField = "txnOpIndex"_sd; static constexpr StringData kApplyOpsIndexField = "applyOpsIndex"_sd; + static constexpr StringData kRawOplogUpdateSpecField = "rawOplogUpdateSpec"_sd; // The target namespace of a rename operation. static constexpr StringData kRenameTargetNssField = "to"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp index c20ba0d03f9..fa0cf402095 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp @@ -32,6 +32,10 @@ #include "mongo/db/pipeline/document_source_change_stream_add_post_image.h" #include "mongo/bson/simple_bsonelement_comparator.h" +#include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/db/pipeline/change_stream_helpers_legacy.h" +#include "mongo/db/pipeline/document_source_change_stream_add_pre_image.h" +#include "mongo/db/update/update_driver.h" namespace mongo { @@ -80,8 +84,33 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamAddPostImage::doGetNext( return input; } + // TODO SERVER-58584: remove the feature flag. + if (_fullDocumentMode != FullDocumentModeEnum::kUpdateLookup) { + tassert(5869000, + str::stream() << "Feature flag must be enabled for fullDocument: " + << FullDocumentMode_serializer(_fullDocumentMode), + feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabled( + serverGlobalParams.featureCompatibility)); + } + + // Create a mutable output document from the input document. MutableDocument output(input.releaseDocument()); - output[kFullDocumentFieldName] = lookupPostImage(output.peek()); + const auto postImageDoc = (_fullDocumentMode == FullDocumentModeEnum::kUpdateLookup + ? lookupLatestPostImage(output.peek()) + : generatePostImage(output.peek())); + uassert( + ErrorCodes::NoMatchingDocument, + str::stream() << "Change stream was configured to require a post-image for all update, " + "delete and replace events, but the post-image was not found for event: " + << output.peek().toString(), + postImageDoc || _fullDocumentMode != FullDocumentModeEnum::kRequired); + + // Even if no post-image was found, we have to populate the 'fullDocument' field. + output[kFullDocumentFieldName] = (postImageDoc ? Value(*postImageDoc) : Value(BSONNULL)); + + // Do not propagate the update modification and pre-image id information further. + output.remove(kRawOplogUpdateSpecFieldName); + output.remove(kPreImageIdFieldName); return output.freeze(); } @@ -106,7 +135,64 @@ NamespaceString DocumentSourceChangeStreamAddPostImage::assertValidNamespace( return nss; } -Value DocumentSourceChangeStreamAddPostImage::lookupPostImage(const Document& updateOp) const { +boost::optional<Document> DocumentSourceChangeStreamAddPostImage::generatePostImage( + const Document& updateOp) const { + // If the 'fullDocumentBeforeChange' is present and null, then we already tried and failed to + // look up a pre-image. We can't compute the post-image without it, so return early. + if (updateOp[kFullDocumentBeforeChangeFieldName].getType() == BSONType::jstNULL) { + return boost::none; + } + + // Otherwise, obtain the pre-image from the information in the input document. + auto preImage = [&]() -> boost::optional<Document> { + // Check whether we have already looked up the pre-image document. + if (!updateOp[kFullDocumentBeforeChangeFieldName].missing()) { + return updateOp[kFullDocumentBeforeChangeFieldName].getDocument(); + } + + // Otherwise, we need to look it up ourselves. Extract the preImageId field. + auto preImageId = updateOp[kPreImageIdFieldName]; + tassert(5869001, + "Missing both 'fullDocumentBeforeChange' and 'preImageId' fields", + !preImageId.missing()); + + // Use DSCSAddPreImage::lookupPreImage to retrieve the actual pre-image. + return DocumentSourceChangeStreamAddPreImage::lookupPreImage(pExpCtx, + preImageId.getDocument()); + }(); + + // Return boost::none if pre-image is missing. + if (!preImage) { + return boost::none; + } + + // Raw oplog update spec field must be provided for the update commands. + tassert(5869002, + "Raw oplog update spec was missing or invalid in change stream", + updateOp[kRawOplogUpdateSpecFieldName].isObject()); + + // Setup the UpdateDriver for performing the post-image computation. + UpdateDriver updateDriver(pExpCtx); + const auto rawOplogUpdateSpec = updateOp[kRawOplogUpdateSpecFieldName].getDocument().toBson(); + const auto updateMod = write_ops::UpdateModification::parseFromOplogEntry( + rawOplogUpdateSpec, {false /* mustCheckExistenceForInsertOperations */}); + // UpdateDriver only expects to apply a diff in the context of oplog application. + updateDriver.setFromOplogApplication(true); + updateDriver.parse(updateMod, {}); + + // Compute post-image. + mutablebson::Document postImage(preImage->toBson()); + uassertStatusOK(updateDriver.update(pExpCtx->opCtx, + StringData(), + &postImage, + false /* validateForStorage */, + FieldRefSet(), + false /* isInsert */)); + return Document(postImage.getObject()); +} + +boost::optional<Document> DocumentSourceChangeStreamAddPostImage::lookupLatestPostImage( + const Document& updateOp) const { // Make sure we have a well-formed input. auto nss = assertValidNamespace(updateOp); @@ -115,23 +201,20 @@ Value DocumentSourceChangeStreamAddPostImage::lookupPostImage(const Document& up BSONType::Object) .getDocument(); - // Extract the UUID from resume token and do change stream lookups by UUID. - auto resumeToken = - ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument()); + // Extract the resume token data from the input event. + auto resumeTokenData = + ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument()).getData(); auto readConcern = BSON("level" << "majority" - << "afterClusterTime" << resumeToken.getData().clusterTime); + << "afterClusterTime" << resumeTokenData.clusterTime); // Update lookup queries sent from mongoS to shards are allowed to use speculative majority - // reads. - invariant(resumeToken.getData().uuid); - auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocument( - pExpCtx, nss, *resumeToken.getData().uuid, documentKey, std::move(readConcern)); - - // Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may - // not have returned any results if the document was deleted in the time since the update op. - return (lookedUpDoc ? Value(*lookedUpDoc) : Value(BSONNULL)); + // reads. Even if the lookup itself succeeded, it may not have returned any results if the + // document was deleted in the time since the update op. + invariant(resumeTokenData.uuid); + return pExpCtx->mongoProcessInterface->lookupSingleDocument( + pExpCtx, nss, *resumeTokenData.uuid, documentKey, std::move(readConcern)); } Value DocumentSourceChangeStreamAddPostImage::serializeLatest( diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h index 980672ca95f..2732639eb9d 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h @@ -45,6 +45,11 @@ public: static constexpr StringData kStageName = "$_internalChangeStreamAddPostImage"_sd; static constexpr StringData kFullDocumentFieldName = DocumentSourceChangeStream::kFullDocumentField; + static constexpr StringData kRawOplogUpdateSpecFieldName = + DocumentSourceChangeStream::kRawOplogUpdateSpecField; + static constexpr StringData kPreImageIdFieldName = DocumentSourceChangeStream::kPreImageIdField; + static constexpr StringData kFullDocumentBeforeChangeFieldName = + DocumentSourceChangeStream::kFullDocumentBeforeChangeField; /** * Creates a DocumentSourceChangeStreamAddPostImage stage. @@ -59,10 +64,14 @@ public: BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); /** - * Only modifies a single path: "fullDocument". + * Only modifies: "fullDocument", "updateModification", "preImageId". */ GetModPathsReturn getModifiedPaths() const final { - return {GetModPathsReturn::Type::kFiniteSet, {kFullDocumentFieldName.toString()}, {}}; + return {GetModPathsReturn::Type::kFiniteSet, + {kFullDocumentFieldName.toString(), + kRawOplogUpdateSpecFieldName.toString(), + kPreImageIdFieldName.toString()}, + {}}; } StageConstraints constraints(Pipeline::SplitState pipeState) const final { @@ -99,6 +108,15 @@ public: deps->fields.insert(DocumentSourceChangeStream::kDocumentKeyField.toString()); deps->fields.insert(DocumentSourceChangeStream::kOperationTypeField.toString()); deps->fields.insert(DocumentSourceChangeStream::kIdField.toString()); + + // Fields needed for post-image computation. + if (_fullDocumentMode != FullDocumentModeEnum::kUpdateLookup) { + deps->fields.insert( + DocumentSourceChangeStream::kFullDocumentBeforeChangeField.toString()); + deps->fields.insert(DocumentSourceChangeStream::kRawOplogUpdateSpecField.toString()); + deps->fields.insert(DocumentSourceChangeStream::kPreImageIdField.toString()); + } + // This stage does not restrict the output fields to a finite set, and has no impact on // whether metadata is available or needed. return DepsTracker::State::SEE_NEXT; @@ -126,11 +144,12 @@ private: */ GetNextResult doGetNext() final; - /** - * Uses the "documentKey" field from 'updateOp' to look up the current version of the document. - * Returns Value(BSONNULL) if the document couldn't be found. - */ - Value lookupPostImage(const Document& updateOp) const; + // Computes a post-image by taking a pre-image and applying an update modification that is + // stored in the oplog entry. Returns boost::none if no pre-image information is available. + boost::optional<Document> generatePostImage(const Document& updateOp) const; + + // Retrieves the current version of the document for the update event. + boost::optional<Document> lookupLatestPostImage(const Document& updateOp) const; /** * Throws a AssertionException if the namespace found in 'inputDoc' doesn't match the one on the diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp index c56dbda1b34..40ac5ef1dab 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp @@ -109,7 +109,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamAddPreImage::doGetNext() // Obtain the pre-image document, if available, given the specified preImageId. auto preImageDoc = lookupPreImage(pExpCtx, preImageId.getDocument()); uassert( - ErrorCodes::ChangeStreamHistoryLost, + ErrorCodes::NoMatchingDocument, str::stream() << "Change stream was configured to require a pre-image for all update, " "delete and replace events, but the pre-image was not found for event: " << input.getDocument().toString(), diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 2251e15e28e..d1e76617542 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -2993,13 +2993,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) { Value(BSONNULL)); checkTransformation(deleteEntry, expectedDeleteWithNullPreImage.freeze(), {}, spec); - // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the - // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost. + // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we + // throw NoMatchingDocument. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); ASSERT_THROWS_CODE(checkTransformation(deleteEntry, boost::none, {}, spec), AssertionException, - ErrorCodes::ChangeStreamHistoryLost); + ErrorCodes::NoMatchingDocument); } TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { @@ -3093,13 +3093,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) { Value(BSONNULL)); checkTransformation(updateEntry, expectedUpdateWithNullPreImage.freeze(), {}, spec); - // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the - // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost. + // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we + // throw NoMatchingDocument. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); ASSERT_THROWS_CODE(checkTransformation(updateEntry, boost::none, {}, spec), AssertionException, - ErrorCodes::ChangeStreamHistoryLost); + ErrorCodes::NoMatchingDocument); } TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { @@ -3187,13 +3187,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) { Value(BSONNULL)); checkTransformation(replaceEntry, expectedReplaceWithNullPreImage.freeze(), {}, spec); - // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the - // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost. + // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we + // throw NoMatchingDocument. spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange" << "required")); ASSERT_THROWS_CODE(checkTransformation(replaceEntry, boost::none, {}, spec), AssertionException, - ErrorCodes::ChangeStreamHistoryLost); + ErrorCodes::NoMatchingDocument); } TEST_F(ChangeStreamStageDBTest, MatchFiltersOperationsOnSystemCollections) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 45214bd6cfe..e03c237c56e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -94,9 +94,16 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform( _changeStreamSpec(std::move(spec)), _isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) { - // If the change stream spec requested a pre-image, make sure that we supply one. - _includePreImageId = - (_changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff); + // Determine whether the user requested a point-in-time pre-image, which will affect this + // stage's output. + _preImageRequested = + _changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff; + + // Determine whether the user requested a point-in-time post-image, which will affect this + // stage's output. + _postImageRequested = + _changeStreamSpec.getFullDocument() == FullDocumentModeEnum::kWhenAvailable || + _changeStreamSpec.getFullDocument() == FullDocumentModeEnum::kRequired; // Extract the resume token or high-water-mark from the spec. auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec); @@ -280,6 +287,12 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document operationType = DocumentSourceChangeStream::kReplaceOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; } + + // Add update modification for post-image computation. + if (_postImageRequested && operationType == DocumentSourceChangeStream::kUpdateOpType) { + doc.addField(DocumentSourceChangeStream::kRawOplogUpdateSpecField, + input[repl::OplogEntry::kObjectFieldName]); + } documentKey = input[repl::OplogEntry::kObject2FieldName]; break; } @@ -395,9 +408,15 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document // Add the post-image, pre-image id, namespace, documentKey and other fields as appropriate. doc.addField(DocumentSourceChangeStream::kFullDocumentField, std::move(fullDocument)); - // Include pre-image id only for update, replace and delete operations. - static const std::set<StringData> preImageOps = {"update", "replace", "delete"}; - if (_includePreImageId && preImageOps.count(operationType)) { + // Determine whether the preImageId should be included, for eligible operations. Note that we + // will include preImageId even if the user requested a post-image but no pre-image, because the + // pre-image is required to compute the post-image. + static const std::set<StringData> preImageOps = {DocumentSourceChangeStream::kUpdateOpType, + DocumentSourceChangeStream::kReplaceOpType, + DocumentSourceChangeStream::kDeleteOpType}; + static const std::set<StringData> postImageOps = {DocumentSourceChangeStream::kUpdateOpType}; + if ((_preImageRequested && preImageOps.count(operationType)) || + (_postImageRequested && postImageOps.count(operationType))) { auto preImageOpTime = input[repl::OplogEntry::kPreImageOpTimeFieldName]; if (!preImageOpTime.missing()) { // Set 'kPreImageIdField' to the pre-image optime. The DSCSAddPreImage stage will use @@ -448,7 +467,7 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString()); deps->fields.insert(DocumentSourceChangeStream::kTxnOpIndexField.toString()); - if (_includePreImageId) { + if (_preImageRequested) { deps->fields.insert(repl::OplogEntry::kPreImageOpTimeFieldName.toString()); deps->fields.insert(DocumentSourceChangeStream::kApplyOpsIndexField.toString()); } diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h index 97cf75dc7af..4ee2de16d1c 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.h +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h @@ -108,8 +108,11 @@ private: // Set to true if this transformation stage can be run on the collectionless namespace. bool _isIndependentOfAnyCollection; - // Set to true if the pre-image id should be included in output documents. - bool _includePreImageId = false; + // Set to true if the pre-image should be included in the output documents. + bool _preImageRequested = false; + + // Set to true if the post-image should be included in the output documents. + bool _postImageRequested = false; }; } // namespace mongo |