diff options
author | Denis Grebennicov <denis.grebennicov@mongodb.com> | 2021-10-04 16:54:18 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-04 17:58:49 +0000 |
commit | c05c56ed7f798ec5725b3883185c4ddebab1b19f (patch) | |
tree | db0b4f825b3d673596240e8bb74122c630101cac /jstests/change_streams | |
parent | 0b1b02acbc21bae6d8aa6468f707f80e70b0ddd6 (diff) | |
download | mongo-c05c56ed7f798ec5725b3883185c4ddebab1b19f.tar.gz |
SERVER-58690 Implement loading of post-images in a change stream
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/lookup_pit_pre_and_post_image.js | 294 | ||||
-rw-r--r-- | jstests/change_streams/lookup_pre_image.js | 2 | ||||
-rw-r--r-- | jstests/change_streams/write_pit_preimage.js | 52 |
3 files changed, 231 insertions, 117 deletions
diff --git a/jstests/change_streams/lookup_pit_pre_and_post_image.js b/jstests/change_streams/lookup_pit_pre_and_post_image.js index 3e218854a1b..49fa4bafa49 100644 --- a/jstests/change_streams/lookup_pit_pre_and_post_image.js +++ b/jstests/change_streams/lookup_pit_pre_and_post_image.js @@ -1,5 +1,5 @@ -// Tests that pre-images are stored in the pre-images collection on updates in collections with -// 'changeStreamPreAndPostImages' set to true. +// Tests that the point-in-time pre- and post-images are loaded correctly in $changeStream running +// with different arguments for collections with 'changeStreamPreAndPostImages' set to true. // @tags: [ // assumes_against_mongod_not_mongos, // change_stream_does_not_expect_txns, @@ -12,11 +12,11 @@ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateC load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled. const testDB = db.getSiblingDB(jsTestName()); -const localDB = db.getSiblingDB("local"); const collName = "test"; -const coll = assertDropAndRecreateCollection(testDB, collName); if (!isChangeStreamPreAndPostImagesEnabled(db)) { + const coll = assertDropAndRecreateCollection(testDB, collName); + // If feature flag is off, creating changeStream with new fullDocument arguments should throw. assert.throwsWithCode(() => coll.watch([], {fullDocument: 'whenAvailable'}), ErrorCodes.BadValue); @@ -27,8 +27,6 @@ if (!isChangeStreamPreAndPostImagesEnabled(db)) { return; } -const collUUID = testDB.getCollectionInfos({name: collName})[0].info.uuid; -const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages"); const originalDoc = { _id: 1, x: 1 @@ -41,96 +39,202 @@ const updatedDoc2 = { _id: 1, x: 5 }; +const replaceDoc = { + _id: 1, + z: 1 +}; -function assertValidPreImage(preImage) { - const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay. - assert.eq(preImage._id.nsUUID, collUUID); - assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()), - MAX_TIME_DELTA_SECONDS); - assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000), - MAX_TIME_DELTA_SECONDS); - assert.eq(preImage._id.applyOpsIndex, 0); -} - -// Open the change streams with new fullDocument parameter set. -assert.doesNotThrow(() => coll.watch([], {fullDocument: 'whenAvailable'})); -assert.doesNotThrow(() => coll.watch([], {fullDocument: 'required'})); - -// Open the change streams with fullDocumentBeforeChange parameter set. -const changeStreamCursorWhenAvailable = coll.watch([], {fullDocumentBeforeChange: 'whenAvailable'}); -let changeStreamCursorRequired = coll.watch([], {fullDocumentBeforeChange: 'required'}); - -// Perform an insert. -assert.commandWorked(coll.insert(originalDoc)); -assert.soon(() => changeStreamCursorWhenAvailable.hasNext()); -assert.soon(() => changeStreamCursorRequired.hasNext()); -assert.eq(changeStreamCursorWhenAvailable.next().operationType, 'insert'); -assert.eq(changeStreamCursorRequired.next().operationType, 'insert'); - -// Pre-images collection should remain empty, as pre-images for insert operations can be found in -// the oplog. -assert.eq(preImagesColl.find().count(), 0); - -// Perform an update with 'damages'. -assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); - -// Since changeStreamPreAndPostImages is not enabled, pre-images collection must be empty. -assert.eq(preImagesColl.find().count(), 0); - -// Change stream with { fullDocumentBeforeChange: 'whenAvailable' } should return null as pre-image. -assert.soon(() => changeStreamCursorWhenAvailable.hasNext()); -const doc = changeStreamCursorWhenAvailable.next(); -assert(doc.hasOwnProperty("fullDocumentBeforeChange")); -assert.isnull(doc.fullDocumentBeforeChange); - -// Change stream with { fullDocumentBeforeChange: 'required' } should fail as pre-image is not -// available. -try { - assert.soon(() => changeStreamCursorRequired.hasNext()); - assert(false, `Unexpected result from cursor: ${tojson(changeStreamCursorRequired.next())}`); -} catch (error) { - assert.eq(error.code, - ErrorCodes.ChangeStreamHistoryLost, - `Caught unexpected error: ${tojson(error)}`); +// Tests the change stream point-in-time pre-/post-images behaviour with different change stream +// options. +function preAndPostImageTest({ + changeStreamOptions = {}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled = {}, + expectedOnUpdateImages = {}, + expectedOnReplaceImages = {} +} = {}) { + // Confirms that the change event document does not contain any internal-only fields. + function assertChangeStreamInternalFieldsNotPresent(changeStreamDoc) { + assert(!changeStreamDoc.hasOwnProperty("preImageId"), changeStreamDoc); + assert(!changeStreamDoc.hasOwnProperty("updateModification"), changeStreamDoc); + + if (!changeStreamOptions.hasOwnProperty("fullDocumentBeforeChange")) { + assert(!changeStreamDoc.hasOwnProperty("fullDocumentBeforeChange"), changeStreamDoc); + } + + if (!changeStreamOptions.hasOwnProperty("fullDocument") && + changeStreamDoc.operationType == "update") { + assert(!changeStreamDoc.hasOwnProperty("fullDocument"), changeStreamDoc); + } + } + + const coll = assertDropAndRecreateCollection(testDB, collName); + + // Open a change stream with the specified test options. + let changeStreamCursor = coll.watch([], Object.assign({}, changeStreamOptions)); + let changeStreamDoc = null; + + // Perform an insert. + assert.commandWorked(coll.insert(originalDoc)); + assert.soon(() => changeStreamCursor.hasNext()); + changeStreamDoc = changeStreamCursor.next(); + assert.eq(changeStreamDoc.operationType, 'insert'); + assertChangeStreamInternalFieldsNotPresent(changeStreamDoc); + + // Perform an update modification. + assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); + + // Change stream should throw an exception while trying to fetch the next document if + // pre-/post-image is required. + const shouldThrow = changeStreamOptions.fullDocument === 'required' || + changeStreamOptions.fullDocumentBeforeChange === 'required'; + if (shouldThrow) { + try { + assert.soon(() => changeStreamCursor.hasNext()); + assert(false, `Unexpected result from cursor: ${tojson(changeStreamCursor.next())}`); + } catch (error) { + assert.eq(error.code, + ErrorCodes.NoMatchingDocument, + `Caught unexpected error: ${tojson(error)}`); + } + + // Reopen the failed change stream. + changeStreamCursor = coll.watch([], Object.assign({}, changeStreamOptions)); + } else { + assert.soon(() => changeStreamCursor.hasNext()); + changeStreamDoc = changeStreamCursor.next(); + assert.eq(changeStreamDoc.fullDocumentBeforeChange, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled.preImage); + assert.eq(changeStreamDoc.fullDocument, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled.postImage); + assertChangeStreamInternalFieldsNotPresent(changeStreamDoc); + } + + // Enable changeStreamPreAndPostImages for pre-images recording. + assert.commandWorked( + testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true})); + + // Perform an update modification. + assert.commandWorked(coll.update(updatedDoc, {$inc: {x: 2}})); + + // The next change stream event should contain the expected pre- and post-images. + assert.soon(() => changeStreamCursor.hasNext()); + changeStreamDoc = changeStreamCursor.next(); + assert.eq(changeStreamDoc.fullDocumentBeforeChange, expectedOnUpdateImages.preImage); + assert.eq(changeStreamDoc.fullDocument, expectedOnUpdateImages.postImage); + assertChangeStreamInternalFieldsNotPresent(changeStreamDoc); + + // Perform a full-document replacement. + assert.commandWorked(coll.update(updatedDoc2, replaceDoc)); + + // The next change stream event should contain the expected pre- and post-images. + assert.soon(() => changeStreamCursor.hasNext()); + changeStreamDoc = changeStreamCursor.next(); + assert.eq(changeStreamDoc.fullDocumentBeforeChange, expectedOnReplaceImages.preImage); + assert.eq(changeStreamDoc.fullDocument, expectedOnReplaceImages.postImage); + assert.eq(changeStreamDoc.operationType, "replace"); + assertChangeStreamInternalFieldsNotPresent(changeStreamDoc); } -// Enable changeStreamPreAndPostImages for pre-images recording. -assert.commandWorked(testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true})); - -// Reopen the failed change stream. -changeStreamCursorRequired = coll.watch([], {fullDocumentBeforeChange: 'required'}); - -// Perform an update with 'damages'. -assert.commandWorked(coll.update(updatedDoc, {$inc: {x: 2}})); - -// Pre-images collection should contain one document with the 'updatedDoc' pre-image. -assert.eq(preImagesColl.find().count({"preImage": updatedDoc}), 1); -let preImageDoc = preImagesColl.find({"preImage": updatedDoc}).next(); -assertValidPreImage(preImageDoc); - -// Change stream should contain the pre-image. -assert.soon(() => changeStreamCursorWhenAvailable.hasNext()); -assert.soon(() => changeStreamCursorRequired.hasNext()); -assert.eq(changeStreamCursorWhenAvailable.next().fullDocumentBeforeChange, updatedDoc); -assert.eq(changeStreamCursorRequired.next().fullDocumentBeforeChange, updatedDoc); - -// Perform an update (replace). -assert.commandWorked(coll.update(updatedDoc2, {z: 1})); - -// Pre-Images collection should contain a new document with the 'updatedDoc2' pre-image. -assert.eq(preImagesColl.find({"preImage": updatedDoc2}).count(), 1); -preImageDoc = preImagesColl.find({"preImage": updatedDoc2}).next(); -assertValidPreImage(preImageDoc); - -// Change stream should contain the pre-image. -assert.soon(() => changeStreamCursorWhenAvailable.hasNext()); -assert.soon(() => changeStreamCursorRequired.hasNext()); - -let changeStreamDoc = changeStreamCursorWhenAvailable.next(); -assert.eq(changeStreamDoc.fullDocumentBeforeChange, updatedDoc2); -assert.eq(changeStreamDoc.operationType, "replace"); - -changeStreamDoc = changeStreamCursorRequired.next(); -assert.eq(changeStreamDoc.fullDocumentBeforeChange, updatedDoc2); -assert.eq(changeStreamDoc.operationType, "replace"); +preAndPostImageTest({ + expectedOnReplaceImages: { + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: { + preImage: null, + }, + expectedOnUpdateImages: { + preImage: updatedDoc, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocument: 'whenAvailable'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: { + postImage: null, + }, + expectedOnUpdateImages: { + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'whenAvailable'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: { + preImage: null, + postImage: null, + }, + expectedOnUpdateImages: { + preImage: updatedDoc, + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'required'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + preImage: updatedDoc, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocument: 'required'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'required', fullDocument: 'required'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + preImage: updatedDoc, + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'whenAvailable', fullDocument: 'required'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + preImage: updatedDoc, + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); +preAndPostImageTest({ + changeStreamOptions: {fullDocumentBeforeChange: 'required', fullDocument: 'whenAvailable'}, + expectedOnUpdateImagesWithChangeStreamPreImagesDisabled: {} /* will throw on hasNext() */, + expectedOnUpdateImages: { + preImage: updatedDoc, + postImage: updatedDoc2, + }, + expectedOnReplaceImages: { + preImage: updatedDoc2, + postImage: replaceDoc, + } +}); }()); diff --git a/jstests/change_streams/lookup_pre_image.js b/jstests/change_streams/lookup_pre_image.js index 72cac8cd0a2..120c6de5671 100644 --- a/jstests/change_streams/lookup_pre_image.js +++ b/jstests/change_streams/lookup_pre_image.js @@ -122,7 +122,7 @@ latestChange.fullDocumentBeforeChange = null; assert.docEq(latestChange, cst.getOneChange(csPreImageWhenAvailableCursor)); // ... but the "required" cursor throws an exception. assert.throwsWithCode(() => cst.getOneChange(csPreImageRequiredCursor), - [ErrorCodes.ChangeStreamHistoryLost, 51770]); + [ErrorCodes.NoMatchingDocument, 51770]); // Test pre-image lookup for an op-style update operation. assert.commandWorked(coll.update({_id: "y"}, {$set: {foo: "baz"}})); diff --git a/jstests/change_streams/write_pit_preimage.js b/jstests/change_streams/write_pit_preimage.js index 5cb7090e300..f81cb18c1e0 100644 --- a/jstests/change_streams/write_pit_preimage.js +++ b/jstests/change_streams/write_pit_preimage.js @@ -16,11 +16,8 @@ load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPo const testDB = db.getSiblingDB(jsTestName()); const localDB = db.getSiblingDB("local"); const collName = "test"; -const coll = - assertDropAndRecreateCollection(testDB, collName, {changeStreamPreAndPostImages: true}); -const collInfos = testDB.getCollectionInfos({name: collName}); -assert.eq(collInfos.length, 1); -const collUUID = collInfos[0].info.uuid; +const coll = assertDropAndRecreateCollection(testDB, collName); +const collUUID = testDB.getCollectionInfos({name: collName})[0].info.uuid; const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages"); const originalDoc = { _id: 1, @@ -31,37 +28,50 @@ const updatedDoc = { x: 3 }; -function assertValidPreImage(preImage) { - const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay. +// Validates the contents of the pre-image collection entry. +function assertValidChangeStreamPreImageDocument(preImage) { + const oplogEntryCursor = localDB.oplog.rs.find({ts: preImage._id.ts}); + assert(oplogEntryCursor.hasNext()); + const oplogEntry = oplogEntryCursor.next(); + assert.eq(oplogEntry.op, "u", oplogEntry); + assert.eq(preImage._id.nsUUID, oplogEntry.ui); assert.eq(preImage._id.nsUUID, collUUID); - assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()), - MAX_TIME_DELTA_SECONDS); - assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000), - MAX_TIME_DELTA_SECONDS); assert.eq(preImage._id.applyOpsIndex, 0); + assert.eq(preImage.operationTime, oplogEntry.wall, oplogEntry); + assert.eq(preImage.preImage._id, oplogEntry.o2._id, oplogEntry); } +// Perform an insert, an update modification and a delete. +assert.commandWorked(coll.insert(originalDoc)); +assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); +assert.commandWorked(coll.remove(updatedDoc)); + +// Since changeStreamPreAndPostImages is not enabled, pre-images collection must be empty. +assert.eq(preImagesColl.count(), 0); + +// Enable changeStreamPreAndPostImages for pre-images recording. +assert.commandWorked(testDB.runCommand({collMod: collName, changeStreamPreAndPostImages: true})); + // Perform an insert. assert.commandWorked(coll.insert(originalDoc)); assert.eq(coll.find().count(), 1); -// Pre-images collection should remain empty, as pre-images for insert operations can be found in -// the oplog. +// Pre-images collection should remain empty, as insert operations do not have pre-images. assert.eq(preImagesColl.find().count(), 0); -// Perform an update with 'damages'. +// Perform an update modification. assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); // Pre-images collection should contain one document with the 'originalDoc' pre-image. -let preimages = preImagesColl.find({"preImage": originalDoc}).toArray(); -assert.eq(preimages.length, 1); -assertValidPreImage(preimages[0]); +let preImages = preImagesColl.find({"preImage": originalDoc}).toArray(); +assert.eq(preImages.length, 1); +assertValidChangeStreamPreImageDocument(preImages[0]); -// Perform an update (replace). +// Perform a full-document replacement. assert.commandWorked(coll.update(updatedDoc, {z: 1})); // Pre-images collection should contain a new document with the 'updatedDoc' pre-image. -preimages = preImagesColl.find({"preImage": updatedDoc}).toArray(); -assert.eq(preimages.length, 1); -assertValidPreImage(preimages[0]); +preImages = preImagesColl.find({"preImage": updatedDoc}).toArray(); +assert.eq(preImages.length, 1); +assertValidChangeStreamPreImageDocument(preImages[0]); }()); |