summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorDenis Grebennicov <denis.grebennicov@mongodb.com>2021-10-04 16:54:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-04 17:58:49 +0000
commitc05c56ed7f798ec5725b3883185c4ddebab1b19f (patch)
treedb0b4f825b3d673596240e8bb74122c630101cac /jstests/change_streams
parent0b1b02acbc21bae6d8aa6468f707f80e70b0ddd6 (diff)
downloadmongo-c05c56ed7f798ec5725b3883185c4ddebab1b19f.tar.gz
SERVER-58690 Implement loading of post-images in a change stream
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/lookup_pit_pre_and_post_image.js294
-rw-r--r--jstests/change_streams/lookup_pre_image.js2
-rw-r--r--jstests/change_streams/write_pit_preimage.js52
3 files changed, 231 insertions, 117 deletions
diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image.js b/jstests/change_streams/lookup_pit_pre_and_post_image.js
index 3e218854a1b..49fa4bafa49 100644
--- a/jstests/change_streams/lookup_pit_pre_and_post_image.js
+++ b/jstests/change_streams/lookup_pit_pre_and_post_image.js
@@ -1,5 +1,5 @@
-// Tests that pre-images are stored in the pre-images collection on updates in collections with
-// 'changeStreamPreAndPostImages' set to true.
+// Tests that the point-in-time pre- and post-images are loaded correctly in $changeStream running
+// with different arguments for collections with 'changeStreamPreAndPostImages' set to true.
// @tags: [
// assumes_against_mongod_not_mongos,
// change_stream_does_not_expect_txns,
@@ -12,11 +12,11 @@ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateC
load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled.
const testDB = db.getSiblingDB(jsTestName());
-const localDB = db.getSiblingDB("local");
const collName = "test";
-const coll = assertDropAndRecreateCollection(testDB, collName);
if (!isChangeStreamPreAndPostImagesEnabled(db)) {
+ const coll = assertDropAndRecreateCollection(testDB, collName);
+
// If feature flag is off, creating changeStream with new fullDocument arguments should throw.
assert.throwsWithCode(() => coll.watch([], {fullDocument: 'whenAvailable'}),
ErrorCodes.BadValue);
@@ -27,8 +27,6 @@ if (!isChangeStreamPreAndPostImagesEnabled(db)) {
return;
}
-const collUUID = testDB.getCollectionInfos({name: collName})[0].info.uuid;
-const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages");
const originalDoc = {
_id: 1,
x: 1
@@ -41,96 +39,202 @@ const updatedDoc2 = {
_id: 1,
x: 5
};
+const replaceDoc = {
+ _id: 1,
+ z: 1
+};
-function assertValidPreImage(preImage) {
- const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay.
- assert.eq(preImage._id.nsUUID, collUUID);
- assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()),
- MAX_TIME_DELTA_SECONDS);
- assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000),
- MAX_TIME_DELTA_SECONDS);
- assert.eq(preImage._id.applyOpsIndex, 0);
-}
-
-// Open the change streams with new fullDocument parameter set.
-assert.doesNotThrow(() => coll.watch([], {fullDocument: 'whenAvailable'}));
-assert.doesNotThrow(() => coll.watch([], {fullDocument: 'required'}));
-
-// Open the change streams with fullDocumentBeforeChange parameter set.
-const changeStreamCursorWhenAvailable = coll.watch([], {fullDocumentBeforeChange: 'whenAvailable'});
-let changeStreamCursorRequired = coll.watch([], {fullDocumentBeforeChange: 'required'});
-
-// Perform an insert.
-assert.commandWorked(coll.insert(originalDoc));
-assert.soon(() => changeStreamCursorWhenAvailable.hasNext());
-assert.soon(() => changeStreamCursorRequired.hasNext());
-assert.eq(changeStreamCursorWhenAvailable.next().operationType, 'insert');
-assert.eq(changeStreamCursorRequired.next().operationType, 'insert');
-
-// Pre-images collection should remain empty, as pre-images for insert operations can be found in
-// the oplog.
-assert.eq(preImagesColl.find().count(), 0);
-
-// Perform an update with 'damages'.
-assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
-
-// Since changeStreamPreAndPostImages is not enabled, pre-images collection must be empty.
-assert.eq(preImagesColl.find().count(), 0);
-
-// Change stream with { fullDocumentBeforeChange: 'whenAvailable' } should return null as pre-image.
-assert.soon(() => changeStreamCursorWhenAvailable.hasNext());
-const doc = changeStreamCursorWhenAvailable.next();
-assert(doc.hasOwnProperty("fullDocumentBeforeChange"));
-assert.isnull(doc.fullDocumentBeforeChange);
-
-// Change stream with { fullDocumentBeforeChange: 'required' } should fail as pre-image is not
-// available.
-try {
- assert.soon(() => changeStreamCursorRequired.hasNext());
- assert(false, `Unexpected result from cursor: ${tojson(changeStreamCursorRequired.next())}`);
-} catch (error) {
- assert.eq(error.code,
- ErrorCodes.ChangeStreamHistoryLost,
- `Caught unexpected error: ${tojson(error)}`);
+// Tests the change stream point-in-time pre-/post-images behaviour with different change stream
+// options.
+function preAndPostImageTest({
+ changeStreamOptions = {},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled = {},
+ expectedOnUpdateImages = {},
+ expectedOnReplaceImages = {}
+} = {}) {
+ // Confirms that the change event document does not contain any internal-only fields.
+ function assertChangeStreamInternalFieldsNotPresent(changeStreamDoc) {
+ assert(!changeStreamDoc.hasOwnProperty("preImageId"), changeStreamDoc);
+ assert(!changeStreamDoc.hasOwnProperty("updateModification"), changeStreamDoc);
+
+ if (!changeStreamOptions.hasOwnProperty("fullDocumentBeforeChange")) {
+ assert(!changeStreamDoc.hasOwnProperty("fullDocumentBeforeChange"), changeStreamDoc);
+ }
+
+ if (!changeStreamOptions.hasOwnProperty("fullDocument") &&
+ changeStreamDoc.operationType == "update") {
+ assert(!changeStreamDoc.hasOwnProperty("fullDocument"), changeStreamDoc);
+ }
+ }
+
+ const coll = assertDropAndRecreateCollection(testDB, collName);
+
+ // Open a change stream with the specified test options.
+ let changeStreamCursor = coll.watch([], Object.assign({}, changeStreamOptions));
+ let changeStreamDoc = null;
+
+ // Perform an insert.
+ assert.commandWorked(coll.insert(originalDoc));
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeStreamDoc = changeStreamCursor.next();
+ assert.eq(changeStreamDoc.operationType, 'insert');
+ assertChangeStreamInternalFieldsNotPresent(changeStreamDoc);
+
+ // Perform an update modification.
+ assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
+
+ // Change stream should throw an exception while trying to fetch the next document if
+ // pre-/post-image is required.
+ const shouldThrow = changeStreamOptions.fullDocument === 'required' ||
+ changeStreamOptions.fullDocumentBeforeChange === 'required';
+ if (shouldThrow) {
+ try {
+ assert.soon(() => changeStreamCursor.hasNext());
+ assert(false, `Unexpected result from cursor: ${tojson(changeStreamCursor.next())}`);
+ } catch (error) {
+ assert.eq(error.code,
+ ErrorCodes.NoMatchingDocument,
+ `Caught unexpected error: ${tojson(error)}`);
+ }
+
+ // Reopen the failed change stream.
+ changeStreamCursor = coll.watch([], Object.assign({}, changeStreamOptions));
+ } else {
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeStreamDoc = changeStreamCursor.next();
+ assert.eq(changeStreamDoc.fullDocumentBeforeChange,
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled.preImage);
+ assert.eq(changeStreamDoc.fullDocument,
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled.postImage);
+ assertChangeStreamInternalFieldsNotPresent(changeStreamDoc);
+ }
+
+ // Enable changeStreamPreAndPostImages for pre-images recording.
+ assert.commandWorked(
+ testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true}));
+
+ // Perform an update modification.
+ assert.commandWorked(coll.update(updatedDoc, {$inc: {x: 2}}));
+
+ // The next change stream event should contain the expected pre- and post-images.
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeStreamDoc = changeStreamCursor.next();
+ assert.eq(changeStreamDoc.fullDocumentBeforeChange, expectedOnUpdateImages.preImage);
+ assert.eq(changeStreamDoc.fullDocument, expectedOnUpdateImages.postImage);
+ assertChangeStreamInternalFieldsNotPresent(changeStreamDoc);
+
+ // Perform a full-document replacement.
+ assert.commandWorked(coll.update(updatedDoc2, replaceDoc));
+
+ // The next change stream event should contain the expected pre- and post-images.
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeStreamDoc = changeStreamCursor.next();
+ assert.eq(changeStreamDoc.fullDocumentBeforeChange, expectedOnReplaceImages.preImage);
+ assert.eq(changeStreamDoc.fullDocument, expectedOnReplaceImages.postImage);
+ assert.eq(changeStreamDoc.operationType, "replace");
+ assertChangeStreamInternalFieldsNotPresent(changeStreamDoc);
}
-// Enable changeStreamPreAndPostImages for pre-images recording.
-assert.commandWorked(testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true}));
-
-// Reopen the failed change stream.
-changeStreamCursorRequired = coll.watch([], {fullDocumentBeforeChange: 'required'});
-
-// Perform an update with 'damages'.
-assert.commandWorked(coll.update(updatedDoc, {$inc: {x: 2}}));
-
-// Pre-images collection should contain one document with the 'updatedDoc' pre-image.
-assert.eq(preImagesColl.find().count({"preImage": updatedDoc}), 1);
-let preImageDoc = preImagesColl.find({"preImage": updatedDoc}).next();
-assertValidPreImage(preImageDoc);
-
-// Change stream should contain the pre-image.
-assert.soon(() => changeStreamCursorWhenAvailable.hasNext());
-assert.soon(() => changeStreamCursorRequired.hasNext());
-assert.eq(changeStreamCursorWhenAvailable.next().fullDocumentBeforeChange, updatedDoc);
-assert.eq(changeStreamCursorRequired.next().fullDocumentBeforeChange, updatedDoc);
-
-// Perform an update (replace).
-assert.commandWorked(coll.update(updatedDoc2, {z: 1}));
-
-// Pre-Images collection should contain a new document with the 'updatedDoc2' pre-image.
-assert.eq(preImagesColl.find({"preImage": updatedDoc2}).count(), 1);
-preImageDoc = preImagesColl.find({"preImage": updatedDoc2}).next();
-assertValidPreImage(preImageDoc);
-
-// Change stream should contain the pre-image.
-assert.soon(() => changeStreamCursorWhenAvailable.hasNext());
-assert.soon(() => changeStreamCursorRequired.hasNext());
-
-let changeStreamDoc = changeStreamCursorWhenAvailable.next();
-assert.eq(changeStreamDoc.fullDocumentBeforeChange, updatedDoc2);
-assert.eq(changeStreamDoc.operationType, "replace");
-
-changeStreamDoc = changeStreamCursorRequired.next();
-assert.eq(changeStreamDoc.fullDocumentBeforeChange, updatedDoc2);
-assert.eq(changeStreamDoc.operationType, "replace");
+preAndPostImageTest({
+ expectedOnReplaceImages: {
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {
+ preImage: null,
+ },
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocument: 'whenAvailable'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {
+ postImage: null,
+ },
+ expectedOnUpdateImages: {
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'whenAvailable'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {
+ preImage: null,
+ postImage: null,
+ },
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'required'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocument: 'required'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'required', fullDocument: 'required'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'required'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
+preAndPostImageTest({
+ changeStreamOptions: {fullDocumentBeforeChange: 'required', fullDocument: 'whenAvailable'},
+ expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */,
+ expectedOnUpdateImages: {
+ preImage: updatedDoc,
+ postImage: updatedDoc2,
+ },
+ expectedOnReplaceImages: {
+ preImage: updatedDoc2,
+ postImage: replaceDoc,
+ }
+});
}());
diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js
index 72cac8cd0a2..120c6de5671 100644
--- a/jstests/change_streams/lookup_pre_image.js
+++ b/jstests/change_streams/lookup_pre_image.js
@@ -122,7 +122,7 @@ latestChange.fullDocumentBeforeChange = null;
assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor));
// ... but the "required" cursor throws an exception.
assert.throwsWithCode(() => cst.getOneChange(csPreImageRequiredCursor),
- [ErrorCodes.ChangeStreamHistoryLost, 51770]);
+ [ErrorCodes.NoMatchingDocument, 51770]);
// Test pre-image lookup for an op-style update operation.
assert.commandWorked(coll.update({_id: "y"}, {$set: {foo: "baz"}}));
diff --git a/jstests/change_streams/write_pit_preimage.js b/jstests/change_streams/write_pit_preimage.js
index 5cb7090e300..f81cb18c1e0 100644
--- a/jstests/change_streams/write_pit_preimage.js
+++ b/jstests/change_streams/write_pit_preimage.js
@@ -16,11 +16,8 @@ load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPo
const testDB = db.getSiblingDB(jsTestName());
const localDB = db.getSiblingDB("local");
const collName = "test";
-const coll =
- assertDropAndRecreateCollection(testDB, collName, {changeStreamPreAndPostImages: true});
-const collInfos = testDB.getCollectionInfos({name: collName});
-assert.eq(collInfos.length, 1);
-const collUUID = collInfos[0].info.uuid;
+const coll = assertDropAndRecreateCollection(testDB, collName);
+const collUUID = testDB.getCollectionInfos({name: collName})[0].info.uuid;
const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages");
const originalDoc = {
_id: 1,
@@ -31,37 +28,50 @@ const updatedDoc = {
x: 3
};
-function assertValidPreImage(preImage) {
- const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay.
+// Validates the contents of the pre-image collection entry.
+function assertValidChangeStreamPreImageDocument(preImage) {
+ const oplogEntryCursor = localDB.oplog.rs.find({ts: preImage._id.ts});
+ assert(oplogEntryCursor.hasNext());
+ const oplogEntry = oplogEntryCursor.next();
+ assert.eq(oplogEntry.op, "u", oplogEntry);
+ assert.eq(preImage._id.nsUUID, oplogEntry.ui);
assert.eq(preImage._id.nsUUID, collUUID);
- assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()),
- MAX_TIME_DELTA_SECONDS);
- assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000),
- MAX_TIME_DELTA_SECONDS);
assert.eq(preImage._id.applyOpsIndex, 0);
+ assert.eq(preImage.operationTime, oplogEntry.wall, oplogEntry);
+ assert.eq(preImage.preImage._id, oplogEntry.o2._id, oplogEntry);
}
+// Perform an insert, an update modification and a delete.
+assert.commandWorked(coll.insert(originalDoc));
+assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
+assert.commandWorked(coll.remove(updatedDoc));
+
+// Since changeStreamPreAndPostImages is not enabled, pre-images collection must be empty.
+assert.eq(preImagesColl.count(), 0);
+
+// Enable changeStreamPreAndPostImages for pre-images recording.
+assert.commandWorked(testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true}));
+
// Perform an insert.
assert.commandWorked(coll.insert(originalDoc));
assert.eq(coll.find().count(), 1);
-// Pre-images collection should remain empty, as pre-images for insert operations can be found in
-// the oplog.
+// Pre-images collection should remain empty, as insert operations do not have pre-images.
assert.eq(preImagesColl.find().count(), 0);
-// Perform an update with 'damages'.
+// Perform an update modification.
assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
// Pre-images collection should contain one document with the 'originalDoc' pre-image.
-let preimages = preImagesColl.find({"preImage": originalDoc}).toArray();
-assert.eq(preimages.length, 1);
-assertValidPreImage(preimages[0]);
+let preImages = preImagesColl.find({"preImage": originalDoc}).toArray();
+assert.eq(preImages.length, 1);
+assertValidChangeStreamPreImageDocument(preImages[0]);
-// Perform an update (replace).
+// Perform a full-document replacement.
assert.commandWorked(coll.update(updatedDoc, {z: 1}));
// Pre-images collection should contain a new document with the 'updatedDoc' pre-image.
-preimages = preImagesColl.find({"preImage": updatedDoc}).toArray();
-assert.eq(preimages.length, 1);
-assertValidPreImage(preimages[0]);
+preImages = preImagesColl.find({"preImage": updatedDoc}).toArray();
+assert.eq(preImages.length, 1);
+assertValidChangeStreamPreImageDocument(preImages[0]);
}());