summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2021-10-04 16:54:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-04 17:58:49 +0000
commitc05c56ed7f798ec5725b3883185c4ddebab1b19f (patch)
treedb0b4f825b3d673596240e8bb74122c630101cac
parent0b1b02acbc21bae6d8aa6468f707f80e70b0ddd6 (diff)
downloadmongo-c05c56ed7f798ec5725b3883185c4ddebab1b19f.tar.gz
SERVER-58690 Implement loading of post-images in a change stream
-rw-r--r--jstests/change_streams/lookup_pit_pre_and_post_image.js294
-rw-r--r--jstests/change_streams/lookup_pre_image.js2
-rw-r--r--jstests/change_streams/write_pit_preimage.js52
-rw-r--r--jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js4
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp111
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_post_image.h33
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp18
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp33
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.h7
12 files changed, 400 insertions, 159 deletions
diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image.js b/jstests/change_streams/lookup_pit_pre_and_post_image.js
index 3e218854a1b..49fa4bafa49 100644
--- a/jstests/change_streams/lookup_pit_pre_and_post_image.js
+++ b/jstests/change_streams/lookup_pit_pre_and_post_image.js
@@ -1,5 +1,5 @@
-// Tests that pre-images are stored in the pre-images collection on updates in collections with
-// 'changeStreamPreAndPostImages' set to true.
+// Tests that the point-in-time pre- and post-images are loaded correctly in $changeStream running
+// with different arguments for collections with 'changeStreamPreAndPostImages' set to true.
// @tags: [
// assumes_against_mongod_not_mongos,
// change_stream_does_not_expect_txns,
@@ -12,11 +12,11 @@ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateC
load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled.
const testDB = db.getSiblingDB(jsTestName());
-const localDB = db.getSiblingDB("local");
const collName = "test";
-const coll = assertDropAndRecreateCollection(testDB, collName);
if (!isChangeStreamPreAndPostImagesEnabled(db)) {
+ const coll = assertDropAndRecreateCollection(testDB, collName);
+
// If feature flag is off, creating changeStream with new fullDocument arguments should throw.
assert.throwsWithCode(() => coll.watch([], {fullDocument: 'whenAvailable'}),
ErrorCodes.BadValue);
@@ -27,8 +27,6 @@ if (!isChangeStreamPreAndPostImagesEnabled(db)) {
return;
}
-const collUUID = testDB.getCollectionInfos({name: collName})[0].info.uuid;
-const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages");
const originalDoc = {
_id: 1,
x: 1
@@ -41,96 +39,202 @@ const updatedDoc2 = {
_id: 1,
x: 5
};
+const replaceDoc = {
+ _id: 1,
+ z: 1
+};
-function assertValidPreImage(preImage) {
- const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay.
- assert.eq(preImage._id.nsUUID, collUUID);
- assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()),
- MAX_TIME_DELTA_SECONDS);
- assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000),
- MAX_TIME_DELTA_SECONDS);
- assert.eq(preImage._id.applyOpsIndex, 0);
-}
-
-// Open the change streams with new fullDocument parameter set.
-assert.doesNotThrow(() => coll.watch([], {fullDocument: 'whenAvailable'}));
-assert.doesNotThrow(() => coll.watch([], {fullDocument: 'required'}));
-
-// Open the change streams with fullDocumentBeforeChange parameter set.
-const changeStreamCursorWhenAvailable = coll.watch([], {fullDocumentBeforeChange: 'whenAvailable'});
-let changeStreamCursorRequired = coll.watch([], {fullDocumentBeforeChange: 'required'});
-
-// Perform an insert.
-assert.commandWorked(coll.insert(originalDoc));
-assert.soon(() => changeStreamCursorWhenAvailable.hasNext());
-assert.soon(() => changeStreamCursorRequired.hasNext());
-assert.eq(changeStreamCursorWhenAvailable.next().operationType, 'insert');
-assert.eq(changeStreamCursorRequired.next().operationType, 'insert');
-
-// Pre-images collection should remain empty, as pre-images for insert operations can be found in
-// the oplog.
-assert.eq(preImagesColl.find().count(), 0);
-
-// Perform an update with 'damages'.
-assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
-
-// Since changeStreamPreAndPostImages is not enabled, pre-images collection must be empty.
-assert.eq(preImagesColl.find().count(), 0);
-
-// Change stream with { fullDocumentBeforeChange: 'whenAvailable' } should return null as pre-image.
-assert.soon(() => changeStreamCursorWhenAvailable.hasNext());
-const doc = changeStreamCursorWhenAvailable.next();
-assert(doc.hasOwnProperty("fullDocumentBeforeChange"));
-assert.isnull(doc.fullDocumentBeforeChange);
-
-// Change stream with { fullDocumentBeforeChange: 'required' } should fail as pre-image is not
-// available.
-try {
- assert.soon(() => changeStreamCursorRequired.hasNext());
- assert(false, `Unexpected result from cursor: ${tojson(changeStreamCursorRequired.next())}`);
-} catch (error) {
- assert.eq(error.code,
- ErrorCodes.ChangeStreamHistoryLost,
- `Caught unexpected error: ${tojson(error)}`);
+// Tests the change stream point-in-time pre-/post-images behaviour with different change stream
+// options.
+function preAndPostImageTest({
+ changeStreamOptions = {},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled = {},
+ expectedOnUpdateImages = {},
+ expectedOnReplaceImages = {}
+} = {}) {
+ // Confirms that the change event document does not contain any internal-only fields.
+ function assertChangeStreamInternalFieldsNotPresent(changeStreamDoc) {
+ assert(!changeStreamDoc.hasOwnProperty("preImageId"), changeStreamDoc);
+ assert(!changeStreamDoc.hasOwnProperty("updateModification"), changeStreamDoc);
+
+ if (!changeStreamOptions.hasOwnProperty("fullDocumentBeforeChange")) {
+ assert(!changeStreamDoc.hasOwnProperty("fullDocumentBeforeChange"), changeStreamDoc);
+ }
+
+ if (!changeStreamOptions.hasOwnProperty("fullDocument") &&
+ changeStreamDoc.operationType == "update") {
+ assert(!changeStreamDoc.hasOwnProperty("fullDocument"), changeStreamDoc);
+ }
+ }
+
+ const coll = assertDropAndRecreateCollection(testDB, collName);
+
+ // Open a change stream with the specified test options.
+ let changeStreamCursor = coll.watch([], Object.assign({}, changeStreamOptions));
+ let changeStreamDoc = null;
+
+ // Perform an insert.
+ assert.commandWorked(coll.insert(originalDoc));
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeStreamDoc = changeStreamCursor.next();
+ assert.eq(changeStreamDoc.operationType, 'insert');
+ assertChangeStreamInternalFieldsNotPresent(changeStreamDoc);
+
+ // Perform an update modification.
+ assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
+
+ // Change stream should throw an exception while trying to fetch the next document if
+ // pre-/post-image is required.
+ const shouldThrow = changeStreamOptions.fullDocument === 'required' ||
+ changeStreamOptions.fullDocumentBeforeChange === 'required';
+ if (shouldThrow) {
+ try {
+ assert.soon(() => changeStreamCursor.hasNext());
+ assert(false, `Unexpected result from cursor: ${tojson(changeStreamCursor.next())}`);
+ } catch (error) {
+ assert.eq(error.code,
+ ErrorCodes.NoMatchingDocument,
+ `Caught unexpected error: ${tojson(error)}`);
+ }
+
+ // Reopen the failed change stream.
+ changeStreamCursor = coll.watch([], Object.assign({}, changeStreamOptions));
+ } else {
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeStreamDoc = changeStreamCursor.next();
+ assert.eq(changeStreamDoc.fullDocumentBeforeChange,
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled.preImage);
+ assert.eq(changeStreamDoc.fullDocument,
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled.postImage);
+ assertChangeStreamInternalFieldsNotPresent(changeStreamDoc);
+ }
+
+ // Enable changeStreamPreAndPostImages for pre-images recording.
+ assert.commandWorked(
+ testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true}));
+
+ // Perform an update modification.
+ assert.commandWorked(coll.update(updatedDoc, {$inc: {x: 2}}));
+
+ // The next change stream event should contain the expected pre- and post-images.
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeStreamDoc = changeStreamCursor.next();
+ assert.eq(changeStreamDoc.fullDocumentBeforeChange, expectedOnUpdateImages.preImage);
+ assert.eq(changeStreamDoc.fullDocument, expectedOnUpdateImages.postImage);
+ assertChangeStreamInternalFieldsNotPresent(changeStreamDoc);
+
+ // Perform a full-document replacement.
+ assert.commandWorked(coll.update(updatedDoc2, replaceDoc));
+
+ // The next change stream event should contain the expected pre- and post-images.
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeStreamDoc = changeStreamCursor.next();
+ assert.eq(changeStreamDoc.fullDocumentBeforeChange, expectedOnReplaceImages.preImage);
+ assert.eq(changeStreamDoc.fullDocument, expectedOnReplaceImages.postImage);
+ assert.eq(changeStreamDoc.operationType, "replace");
+ assertChangeStreamInternalFieldsNotPresent(changeStreamDoc);
}
-// Enable changeStreamPreAndPostImages for pre-images recording.
-assert.commandWorked(testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true}));
-
-// Reopen the failed change stream.
-changeStreamCursorRequired = coll.watch([], {fullDocumentBeforeChange: 'required'});
-
-// Perform an update with 'damages'.
-assert.commandWorked(coll.update(updatedDoc, {$inc: {x: 2}}));
-
-// Pre-images collection should contain one document with the 'updatedDoc' pre-image.
-assert.eq(preImagesColl.find().count({"preImage": updatedDoc}), 1);
-let preImageDoc = preImagesColl.find({"preImage": updatedDoc}).next();
-assertValidPreImage(preImageDoc);
-
-// Change stream should contain the pre-image.
-assert.soon(() => changeStreamCursorWhenAvailable.hasNext());
-assert.soon(() => changeStreamCursorRequired.hasNext());
-assert.eq(changeStreamCursorWhenAvailable.next().fullDocumentBeforeChange, updatedDoc);
-assert.eq(changeStreamCursorRequired.next().fullDocumentBeforeChange, updatedDoc);
-
-// Perform an update (replace).
-assert.commandWorked(coll.update(updatedDoc2, {z: 1}));
-
-// Pre-Images collection should contain a new document with the 'updatedDoc2' pre-image.
-assert.eq(preImagesColl.find({"preImage": updatedDoc2}).count(), 1);
-preImageDoc = preImagesColl.find({"preImage": updatedDoc2}).next();
-assertValidPreImage(preImageDoc);
-
-// Change stream should contain the pre-image.
-assert.soon(() => changeStreamCursorWhenAvailable.hasNext());
-assert.soon(() => changeStreamCursorRequired.hasNext());
-
-let changeStreamDoc = changeStreamCursorWhenAvailable.next();
-assert.eq(changeStreamDoc.fullDocumentBeforeChange, updatedDoc2);
-assert.eq(changeStreamDoc.operationType, "replace");
-
-changeStreamDoc = changeStreamCursorRequired.next();
-assert.eq(changeStreamDoc.fullDocumentBeforeChange, updatedDoc2);
-assert.eq(changeStreamDoc.operationType, "replace");
+preAndPostImageTest({
+ expectedOnReplaceImages: {
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {
+ preImage: null,
+ },
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocument: 'whenAvailable'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {
+ postImage: null,
+ },
+ expectedOnUpdateImages: {
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'whenAvailable'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {
+ preImage: null,
+ postImage: null,
+ },
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'required'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocument: 'required'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'required', fullDocument: 'required'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'required'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'required', fullDocument: 'whenAvailable'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
}());
diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js
index 72cac8cd0a2..120c6de5671 100644
--- a/jstests/change_streams/lookup_pre_image.js
+++ b/jstests/change_streams/lookup_pre_image.js
@@ -122,7 +122,7 @@ latestChange.fullDocumentBeforeChange = null;
assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
// ... but the "required" cursor throws an exception.
assert.throwsWithCode(() => cst.getOneChange(csPreImageRequiredCursor),
- [ErrorCodes.ChangeStreamHistoryLost, 51770]);
+ [ErrorCodes.NoMatchingDocument, 51770]);
// Test pre-image lookup for an op-style update operation.
assert.commandWorked(coll.update({_id: "y"}, {$set: {foo: "baz"}}));
diff --git a/jstests/change_streams/write_pit_preimage.js b/jstests/change_streams/write_pit_preimage.js
index 5cb7090e300..f81cb18c1e0 100644
--- a/jstests/change_streams/write_pit_preimage.js
+++ b/jstests/change_streams/write_pit_preimage.js
@@ -16,11 +16,8 @@ load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPo
const testDB = db.getSiblingDB(jsTestName());
const localDB = db.getSiblingDB("local");
const collName = "test";
-const coll =
- assertDropAndRecreateCollection(testDB, collName, {changeStreamPreAndPostImages: true});
-const collInfos = testDB.getCollectionInfos({name: collName});
-assert.eq(collInfos.length, 1);
-const collUUID = collInfos[0].info.uuid;
+const coll = assertDropAndRecreateCollection(testDB, collName);
+const collUUID = testDB.getCollectionInfos({name: collName})[0].info.uuid;
const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages");
const originalDoc = {
_id: 1,
@@ -31,37 +28,50 @@ const updatedDoc = {
x: 3
};
-function assertValidPreImage(preImage) {
- const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay.
+// Validates the contents of the pre-image collection entry.
+function assertValidChangeStreamPreImageDocument(preImage) {
+ const oplogEntryCursor = localDB.oplog.rs.find({ts: preImage._id.ts});
+ assert(oplogEntryCursor.hasNext());
+ const oplogEntry = oplogEntryCursor.next();
+ assert.eq(oplogEntry.op, "u", oplogEntry);
+ assert.eq(preImage._id.nsUUID, oplogEntry.ui);
assert.eq(preImage._id.nsUUID, collUUID);
- assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()),
- MAX_TIME_DELTA_SECONDS);
- assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000),
- MAX_TIME_DELTA_SECONDS);
assert.eq(preImage._id.applyOpsIndex, 0);
+ assert.eq(preImage.operationTime, oplogEntry.wall, oplogEntry);
+ assert.eq(preImage.preImage._id, oplogEntry.o2._id, oplogEntry);
}
+// Perform an insert, an update modification and a delete.
+assert.commandWorked(coll.insert(originalDoc));
+assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
+assert.commandWorked(coll.remove(updatedDoc));
+
+// Since changeStreamPreAndPostImages is not enabled, pre-images collection must be empty.
+assert.eq(preImagesColl.count(), 0);
+
+// Enable changeStreamPreAndPostImages for pre-images recording.
+assert.commandWorked(testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true}));
+
// Perform an insert.
assert.commandWorked(coll.insert(originalDoc));
assert.eq(coll.find().count(), 1);
-// Pre-images collection should remain empty, as pre-images for insert operations can be found in
-// the oplog.
+// Pre-images collection should remain empty, as insert operations do not have pre-images.
assert.eq(preImagesColl.find().count(), 0);
-// Perform an update with 'damages'.
+// Perform an update modification.
assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
// Pre-images collection should contain one document with the 'originalDoc' pre-image.
-let preimages = preImagesColl.find({"preImage": originalDoc}).toArray();
-assert.eq(preimages.length, 1);
-assertValidPreImage(preimages[0]);
+let preImages = preImagesColl.find({"preImage": originalDoc}).toArray();
+assert.eq(preImages.length, 1);
+assertValidChangeStreamPreImageDocument(preImages[0]);
-// Perform an update (replace).
+// Perform a full-document replacement.
assert.commandWorked(coll.update(updatedDoc, {z: 1}));
// Pre-images collection should contain a new document with the 'updatedDoc' pre-image.
-preimages = preImagesColl.find({"preImage": updatedDoc}).toArray();
-assert.eq(preimages.length, 1);
-assertValidPreImage(preimages[0]);
+preImages = preImagesColl.find({"preImage": updatedDoc}).toArray();
+assert.eq(preImages.length, 1);
+assertValidChangeStreamPreImageDocument(preImages[0]);
}());
diff --git a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
index 010e69365cb..847642da319 100644
--- a/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
+++ b/jstests/noPassthrough/change_stream_pre_image_lookup_whole_db_whole_cluster.js
@@ -60,7 +60,7 @@ assert.throwsWithCode(function() {
return assert.soon(() => wholeDBStream.hasNext() &&
wholeDBStream.next().documentKey._id === "last_change_sentinel");
-}, [ErrorCodes.ChangeStreamHistoryLost, 51770]);
+}, [ErrorCodes.NoMatchingDocument, 51770]);
// Confirm that attempting to open a whole-cluster stream on with mode "required" fails.
assert.throwsWithCode(function() {
@@ -72,7 +72,7 @@ assert.throwsWithCode(function() {
return assert.soon(() => wholeClusterStream.hasNext() &&
wholeClusterStream.next().documentKey._id == "last_change_sentinel");
-}, [ErrorCodes.ChangeStreamHistoryLost, 51770]);
+}, [ErrorCodes.NoMatchingDocument, 51770]);
// However, if we open a whole-db or whole-cluster stream that filters for only the namespace with
// pre-images, then the cursor can proceed. This is because the $match gets moved ahead of the
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 5e47640079a..4435a99896c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -380,8 +380,10 @@ env.Library(
'document_source_change_stream_unwind_transaction.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/ops/write_ops_parsers',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers',
+ '$BUILD_DIR/mongo/db/update/update_driver',
'$BUILD_DIR/mongo/s/query/router_exec_stage',
'change_stream_preimage',
],
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index b8413a761e2..b58c31108d6 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -157,6 +157,7 @@ public:
static constexpr StringData kLsidField = "lsid"_sd;
static constexpr StringData kTxnOpIndexField = "txnOpIndex"_sd;
static constexpr StringData kApplyOpsIndexField = "applyOpsIndex"_sd;
+ static constexpr StringData kRawOplogUpdateSpecField = "rawOplogUpdateSpec"_sd;
// The target namespace of a rename operation.
static constexpr StringData kRenameTargetNssField = "to"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
index c20ba0d03f9..fa0cf402095 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.cpp
@@ -32,6 +32,10 @@
#include "mongo/db/pipeline/document_source_change_stream_add_post_image.h"
#include "mongo/bson/simple_bsonelement_comparator.h"
+#include "mongo/db/ops/write_ops_parsers.h"
+#include "mongo/db/pipeline/change_stream_helpers_legacy.h"
+#include "mongo/db/pipeline/document_source_change_stream_add_pre_image.h"
+#include "mongo/db/update/update_driver.h"
namespace mongo {
@@ -80,8 +84,33 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamAddPostImage::doGetNext(
return input;
}
+ // TODO SERVER-58584: remove the feature flag.
+ if (_fullDocumentMode != FullDocumentModeEnum::kUpdateLookup) {
+ tassert(5869000,
+ str::stream() << "Feature flag must be enabled for fullDocument: "
+ << FullDocumentMode_serializer(_fullDocumentMode),
+ feature_flags::gFeatureFlagChangeStreamPreAndPostImages.isEnabled(
+ serverGlobalParams.featureCompatibility));
+ }
+
+ // Create a mutable output document from the input document.
MutableDocument output(input.releaseDocument());
- output[kFullDocumentFieldName] = lookupPostImage(output.peek());
+ const auto postImageDoc = (_fullDocumentMode == FullDocumentModeEnum::kUpdateLookup
+ ? lookupLatestPostImage(output.peek())
+ : generatePostImage(output.peek()));
+ uassert(
+ ErrorCodes::NoMatchingDocument,
+ str::stream() << "Change stream was configured to require a post-image for all update, "
+ "delete and replace events, but the post-image was not found for event: "
+ << output.peek().toString(),
+ postImageDoc || _fullDocumentMode != FullDocumentModeEnum::kRequired);
+
+ // Even if no post-image was found, we have to populate the 'fullDocument' field.
+ output[kFullDocumentFieldName] = (postImageDoc ? Value(*postImageDoc) : Value(BSONNULL));
+
+ // Do not propagate the update modification and pre-image id information further.
+ output.remove(kRawOplogUpdateSpecFieldName);
+ output.remove(kPreImageIdFieldName);
return output.freeze();
}
@@ -106,7 +135,64 @@ NamespaceString DocumentSourceChangeStreamAddPostImage::assertValidNamespace(
return nss;
}
-Value DocumentSourceChangeStreamAddPostImage::lookupPostImage(const Document& updateOp) const {
+boost::optional<Document> DocumentSourceChangeStreamAddPostImage::generatePostImage(
+ const Document& updateOp) const {
+ // If the 'fullDocumentBeforeChange' is present and null, then we already tried and failed to
+ // look up a pre-image. We can't compute the post-image without it, so return early.
+ if (updateOp[kFullDocumentBeforeChangeFieldName].getType() == BSONType::jstNULL) {
+ return boost::none;
+ }
+
+ // Otherwise, obtain the pre-image from the information in the input document.
+ auto preImage = [&]() -> boost::optional<Document> {
+ // Check whether we have already looked up the pre-image document.
+ if (!updateOp[kFullDocumentBeforeChangeFieldName].missing()) {
+ return updateOp[kFullDocumentBeforeChangeFieldName].getDocument();
+ }
+
+ // Otherwise, we need to look it up ourselves. Extract the preImageId field.
+ auto preImageId = updateOp[kPreImageIdFieldName];
+ tassert(5869001,
+ "Missing both 'fullDocumentBeforeChange' and 'preImageId' fields",
+ !preImageId.missing());
+
+ // Use DSCSAddPreImage::lookupPreImage to retrieve the actual pre-image.
+ return DocumentSourceChangeStreamAddPreImage::lookupPreImage(pExpCtx,
+ preImageId.getDocument());
+ }();
+
+ // Return boost::none if pre-image is missing.
+ if (!preImage) {
+ return boost::none;
+ }
+
+ // Raw oplog update spec field must be provided for the update commands.
+ tassert(5869002,
+ "Raw oplog update spec was missing or invalid in change stream",
+ updateOp[kRawOplogUpdateSpecFieldName].isObject());
+
+ // Setup the UpdateDriver for performing the post-image computation.
+ UpdateDriver updateDriver(pExpCtx);
+ const auto rawOplogUpdateSpec = updateOp[kRawOplogUpdateSpecFieldName].getDocument().toBson();
+ const auto updateMod = write_ops::UpdateModification::parseFromOplogEntry(
+ rawOplogUpdateSpec, {false /* mustCheckExistenceForInsertOperations */});
+ // UpdateDriver only expects to apply a diff in the context of oplog application.
+ updateDriver.setFromOplogApplication(true);
+ updateDriver.parse(updateMod, {});
+
+ // Compute post-image.
+ mutablebson::Document postImage(preImage->toBson());
+ uassertStatusOK(updateDriver.update(pExpCtx->opCtx,
+ StringData(),
+ &postImage,
+ false /* validateForStorage */,
+ FieldRefSet(),
+ false /* isInsert */));
+ return Document(postImage.getObject());
+}
+
+boost::optional<Document> DocumentSourceChangeStreamAddPostImage::lookupLatestPostImage(
+ const Document& updateOp) const {
// Make sure we have a well-formed input.
auto nss = assertValidNamespace(updateOp);
@@ -115,23 +201,20 @@ Value DocumentSourceChangeStreamAddPostImage::lookupPostImage(const Document& up
BSONType::Object)
.getDocument();
- // Extract the UUID from resume token and do change stream lookups by UUID.
- auto resumeToken =
- ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument());
+ // Extract the resume token data from the input event.
+ auto resumeTokenData =
+ ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument()).getData();
auto readConcern = BSON("level"
<< "majority"
- << "afterClusterTime" << resumeToken.getData().clusterTime);
+ << "afterClusterTime" << resumeTokenData.clusterTime);
// Update lookup queries sent from mongoS to shards are allowed to use speculative majority
- // reads.
- invariant(resumeToken.getData().uuid);
- auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocument(
- pExpCtx, nss, *resumeToken.getData().uuid, documentKey, std::move(readConcern));
-
- // Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may
- // not have returned any results if the document was deleted in the time since the update op.
- return (lookedUpDoc ? Value(*lookedUpDoc) : Value(BSONNULL));
+ // reads. Even if the lookup itself succeeded, it may not have returned any results if the
+ // document was deleted in the time since the update op.
+ invariant(resumeTokenData.uuid);
+ return pExpCtx->mongoProcessInterface->lookupSingleDocument(
+ pExpCtx, nss, *resumeTokenData.uuid, documentKey, std::move(readConcern));
}
Value DocumentSourceChangeStreamAddPostImage::serializeLatest(
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h
index 980672ca95f..2732639eb9d 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_post_image.h
@@ -45,6 +45,11 @@ public:
static constexpr StringData kStageName = "$_internalChangeStreamAddPostImage"_sd;
static constexpr StringData kFullDocumentFieldName =
DocumentSourceChangeStream::kFullDocumentField;
+ static constexpr StringData kRawOplogUpdateSpecFieldName =
+ DocumentSourceChangeStream::kRawOplogUpdateSpecField;
+ static constexpr StringData kPreImageIdFieldName = DocumentSourceChangeStream::kPreImageIdField;
+ static constexpr StringData kFullDocumentBeforeChangeFieldName =
+ DocumentSourceChangeStream::kFullDocumentBeforeChangeField;
/**
* Creates a DocumentSourceChangeStreamAddPostImage stage.
@@ -59,10 +64,14 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
- * Only modifies a single path: "fullDocument".
+ * Only modifies: "fullDocument", "updateModification", "preImageId".
*/
GetModPathsReturn getModifiedPaths() const final {
- return {GetModPathsReturn::Type::kFiniteSet, {kFullDocumentFieldName.toString()}, {}};
+ return {GetModPathsReturn::Type::kFiniteSet,
+ {kFullDocumentFieldName.toString(),
+ kRawOplogUpdateSpecFieldName.toString(),
+ kPreImageIdFieldName.toString()},
+ {}};
}
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
@@ -99,6 +108,15 @@ public:
deps->fields.insert(DocumentSourceChangeStream::kDocumentKeyField.toString());
deps->fields.insert(DocumentSourceChangeStream::kOperationTypeField.toString());
deps->fields.insert(DocumentSourceChangeStream::kIdField.toString());
+
+ // Fields needed for post-image computation.
+ if (_fullDocumentMode != FullDocumentModeEnum::kUpdateLookup) {
+ deps->fields.insert(
+ DocumentSourceChangeStream::kFullDocumentBeforeChangeField.toString());
+ deps->fields.insert(DocumentSourceChangeStream::kRawOplogUpdateSpecField.toString());
+ deps->fields.insert(DocumentSourceChangeStream::kPreImageIdField.toString());
+ }
+
// This stage does not restrict the output fields to a finite set, and has no impact on
// whether metadata is available or needed.
return DepsTracker::State::SEE_NEXT;
@@ -126,11 +144,12 @@ private:
*/
GetNextResult doGetNext() final;
- /**
- * Uses the "documentKey" field from 'updateOp' to look up the current version of the document.
- * Returns Value(BSONNULL) if the document couldn't be found.
- */
- Value lookupPostImage(const Document& updateOp) const;
+ // Computes a post-image by taking a pre-image and applying an update modification that is
+ // stored in the oplog entry. Returns boost::none if no pre-image information is available.
+ boost::optional<Document> generatePostImage(const Document& updateOp) const;
+
+ // Retrieves the current version of the document for the update event.
+ boost::optional<Document> lookupLatestPostImage(const Document& updateOp) const;
/**
* Throws a AssertionException if the namespace found in 'inputDoc' doesn't match the one on the
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
index c56dbda1b34..40ac5ef1dab 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
@@ -109,7 +109,7 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamAddPreImage::doGetNext()
// Obtain the pre-image document, if available, given the specified preImageId.
auto preImageDoc = lookupPreImage(pExpCtx, preImageId.getDocument());
uassert(
- ErrorCodes::ChangeStreamHistoryLost,
+ ErrorCodes::NoMatchingDocument,
str::stream() << "Change stream was configured to require a pre-image for all update, "
"delete and replace events, but the pre-image was not found for event: "
<< input.getDocument().toString(),
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 2251e15e28e..d1e76617542 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -2993,13 +2993,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForDelete) {
Value(BSONNULL));
checkTransformation(deleteEntry, expectedDeleteWithNullPreImage.freeze(), {}, spec);
- // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the
- // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost.
+ // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we
+ // throw NoMatchingDocument.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
ASSERT_THROWS_CODE(checkTransformation(deleteEntry, boost::none, {}, spec),
AssertionException,
- ErrorCodes::ChangeStreamHistoryLost);
+ ErrorCodes::NoMatchingDocument);
}
TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
@@ -3093,13 +3093,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForUpdate) {
Value(BSONNULL));
checkTransformation(updateEntry, expectedUpdateWithNullPreImage.freeze(), {}, spec);
- // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the
- // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost.
+ // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we
+ // throw NoMatchingDocument.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
ASSERT_THROWS_CODE(checkTransformation(updateEntry, boost::none, {}, spec),
AssertionException,
- ErrorCodes::ChangeStreamHistoryLost);
+ ErrorCodes::NoMatchingDocument);
}
TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
@@ -3187,13 +3187,13 @@ TEST_F(ChangeStreamStageTest, TransformPreImageForReplace) {
Value(BSONNULL));
checkTransformation(replaceEntry, expectedReplaceWithNullPreImage.freeze(), {}, spec);
- // When run with {fullDocumentBeforeChange: "required"} and a 'preImageOpTime' is present in the
- // event's oplog entry but we cannot find the pre-image, we throw ChangeStreamHistoryLost.
+ // When run with {fullDocumentBeforeChange: "required"} but we cannot find the pre-image, we
+ // throw NoMatchingDocument.
spec = BSON("$changeStream" << BSON("fullDocumentBeforeChange"
<< "required"));
ASSERT_THROWS_CODE(checkTransformation(replaceEntry, boost::none, {}, spec),
AssertionException,
- ErrorCodes::ChangeStreamHistoryLost);
+ ErrorCodes::NoMatchingDocument);
}
TEST_F(ChangeStreamStageDBTest, MatchFiltersOperationsOnSystemCollections) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 45214bd6cfe..e03c237c56e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -94,9 +94,16 @@ DocumentSourceChangeStreamTransform::DocumentSourceChangeStreamTransform(
_changeStreamSpec(std::move(spec)),
_isIndependentOfAnyCollection(expCtx->ns.isCollectionlessAggregateNS()) {
- // If the change stream spec requested a pre-image, make sure that we supply one.
- _includePreImageId =
- (_changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff);
+ // Determine whether the user requested a point-in-time pre-image, which will affect this
+ // stage's output.
+ _preImageRequested =
+ _changeStreamSpec.getFullDocumentBeforeChange() != FullDocumentBeforeChangeModeEnum::kOff;
+
+ // Determine whether the user requested a point-in-time post-image, which will affect this
+ // stage's output.
+ _postImageRequested =
+ _changeStreamSpec.getFullDocument() == FullDocumentModeEnum::kWhenAvailable ||
+ _changeStreamSpec.getFullDocument() == FullDocumentModeEnum::kRequired;
// Extract the resume token or high-water-mark from the spec.
auto tokenData = DocumentSourceChangeStream::resolveResumeTokenFromSpec(_changeStreamSpec);
@@ -280,6 +287,12 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
operationType = DocumentSourceChangeStream::kReplaceOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
}
+
+ // Add update modification for post-image computation.
+ if (_postImageRequested && operationType == DocumentSourceChangeStream::kUpdateOpType) {
+ doc.addField(DocumentSourceChangeStream::kRawOplogUpdateSpecField,
+ input[repl::OplogEntry::kObjectFieldName]);
+ }
documentKey = input[repl::OplogEntry::kObject2FieldName];
break;
}
@@ -395,9 +408,15 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
// Add the post-image, pre-image id, namespace, documentKey and other fields as appropriate.
doc.addField(DocumentSourceChangeStream::kFullDocumentField, std::move(fullDocument));
- // Include pre-image id only for update, replace and delete operations.
- static const std::set<StringData> preImageOps = {"update", "replace", "delete"};
- if (_includePreImageId && preImageOps.count(operationType)) {
+ // Determine whether the preImageId should be included, for eligible operations. Note that we
+ // will include preImageId even if the user requested a post-image but no pre-image, because the
+ // pre-image is required to compute the post-image.
+ static const std::set<StringData> preImageOps = {DocumentSourceChangeStream::kUpdateOpType,
+ DocumentSourceChangeStream::kReplaceOpType,
+ DocumentSourceChangeStream::kDeleteOpType};
+ static const std::set<StringData> postImageOps = {DocumentSourceChangeStream::kUpdateOpType};
+ if ((_preImageRequested && preImageOps.count(operationType)) ||
+ (_postImageRequested && postImageOps.count(operationType))) {
auto preImageOpTime = input[repl::OplogEntry::kPreImageOpTimeFieldName];
if (!preImageOpTime.missing()) {
// Set 'kPreImageIdField' to the pre-image optime. The DSCSAddPreImage stage will use
@@ -448,7 +467,7 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac
deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString());
deps->fields.insert(DocumentSourceChangeStream::kTxnOpIndexField.toString());
- if (_includePreImageId) {
+ if (_preImageRequested) {
deps->fields.insert(repl::OplogEntry::kPreImageOpTimeFieldName.toString());
deps->fields.insert(DocumentSourceChangeStream::kApplyOpsIndexField.toString());
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.h b/src/mongo/db/pipeline/document_source_change_stream_transform.h
index 97cf75dc7af..4ee2de16d1c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.h
@@ -108,8 +108,11 @@ private:
// Set to true if this transformation stage can be run on the collectionless namespace.
bool _isIndependentOfAnyCollection;
- // Set to true if the pre-image id should be included in output documents.
- bool _includePreImageId = false;
+ // Set to true if the pre-image should be included in the output documents.
+ bool _preImageRequested = false;
+
+ // Set to true if the post-image should be included in the output documents.
+ bool _postImageRequested = false;
};
} // namespace mongo