diff options
-rw-r--r-- | jstests/change_streams/write_pit_preimage.js | 67 | ||||
-rw-r--r-- | jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js | 2 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.h | 1 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 66 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_preimage.idl | 2 |
8 files changed, 162 insertions, 13 deletions
diff --git a/jstests/change_streams/write_pit_preimage.js b/jstests/change_streams/write_pit_preimage.js new file mode 100644 index 00000000000..5cb7090e300 --- /dev/null +++ b/jstests/change_streams/write_pit_preimage.js @@ -0,0 +1,67 @@ +// Tests that pre-images are stored in the pre-images collection on updates in collections with +// 'changeStreamPreAndPostImages' set to true. +// @tags: [ +// requires_fcv_51, +// featureFlagChangeStreamPreAndPostImages, +// assumes_against_mongod_not_mongos, +// change_stream_does_not_expect_txns, +// multiversion_incompatible, +// ] +(function() { +"use strict"; + +load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. +load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled. + +const testDB = db.getSiblingDB(jsTestName()); +const localDB = db.getSiblingDB("local"); +const collName = "test"; +const coll = + assertDropAndRecreateCollection(testDB, collName, {changeStreamPreAndPostImages: true}); +const collInfos = testDB.getCollectionInfos({name: collName}); +assert.eq(collInfos.length, 1); +const collUUID = collInfos[0].info.uuid; +const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages"); +const originalDoc = { + _id: 1, + x: 1 +}; +const updatedDoc = { + _id: 1, + x: 3 +}; + +function assertValidPreImage(preImage) { + const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay. + assert.eq(preImage._id.nsUUID, collUUID); + assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()), + MAX_TIME_DELTA_SECONDS); + assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000), + MAX_TIME_DELTA_SECONDS); + assert.eq(preImage._id.applyOpsIndex, 0); +} + +// Perform an insert. +assert.commandWorked(coll.insert(originalDoc)); +assert.eq(coll.find().count(), 1); + +// Pre-images collection should remain empty, as pre-images for insert operations can be found in +// the oplog. +assert.eq(preImagesColl.find().count(), 0); + +// Perform an update with 'damages'. +assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}})); + +// Pre-images collection should contain one document with the 'originalDoc' pre-image. +let preimages = preImagesColl.find({"preImage": originalDoc}).toArray(); +assert.eq(preimages.length, 1); +assertValidPreImage(preimages[0]); + +// Perform an update (replace). +assert.commandWorked(coll.update(updatedDoc, {z: 1})); + +// Pre-images collection should contain a new document with the 'updatedDoc' pre-image. +preimages = preImagesColl.find({"preImage": updatedDoc}).toArray(); +assert.eq(preimages.length, 1); +assertValidPreImage(preimages[0]); +}()); diff --git a/jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js b/jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js index 03ee3b144f1..beb31ecd39c 100644 --- a/jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js +++ b/jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js @@ -44,7 +44,7 @@ function assertPreimagesCollectionExists() { assert.eq(result.cursor.firstBatch[0].name, preimagesCollName); } -// Check that we cannot set 'changeStreamPreAndPostImages' on the local or admin databases. +// Check that we cannot set 'changeStreamPreAndPostImages' on the local, admin and config databases. for (const db of [localDB, adminDB, configDB]) { assert.commandFailedWithCode( db.runCommand({create: collName, changeStreamPreAndPostImages: true}), diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 0471048fd3b..f83373e8d84 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -851,6 +851,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/catalog/collection_catalog', + '$BUILD_DIR/mongo/db/pipeline/change_stream_preimage', '$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker', '$BUILD_DIR/mongo/db/timeseries/bucket_catalog', '$BUILD_DIR/mongo/s/coreshard', diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 4cba32ed567..cd59dff9d10 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -95,6 +95,7 @@ struct CollectionUpdateArgs { StoreDocOption storeDocOption = StoreDocOption::None; bool preImageRecordingEnabledForCollection = false; + bool changeStreamPreAndPostImagesEnabledForCollection = false; // Set if an OpTime was reserved for the update ahead of time. boost::optional<OplogSlot> oplogSlot = boost::none; diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 2edd03b9308..ffdd6c5e893 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -1252,6 +1252,9 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx, args->preImageDoc = oldDoc.value().getOwned(); } args->preImageRecordingEnabledForCollection = getRecordPreImages(); + args->changeStreamPreAndPostImagesEnabledForCollection = + isChangeStreamPreAndPostImagesEnabled(); + const bool storePrePostImage = args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; if (!args->oplogSlot && storePrePostImage) { @@ -1315,7 +1318,7 @@ StatusWith<RecordData> CollectionImpl::updateDocumentWithDamages( // For in-place updates we need to grab an owned copy of the pre-image doc if pre-image // recording is enabled and we haven't already set the pre-image due to this update being // a retryable findAndModify or a possible update to the shard key. - if (!args->preImageDoc && getRecordPreImages()) { + if (!args->preImageDoc && (getRecordPreImages() || isChangeStreamPreAndPostImagesEnabled())) { args->preImageDoc = oldRec.value().toBson().getOwned(); } const bool storePrePostImage = @@ -1337,6 +1340,9 @@ StatusWith<RecordData> CollectionImpl::updateDocumentWithDamages( if (newRecStatus.isOK()) { args->updatedDoc = newRecStatus.getValue().toBson(); args->preImageRecordingEnabledForCollection = getRecordPreImages(); + args->changeStreamPreAndPostImagesEnabledForCollection = + isChangeStreamPreAndPostImagesEnabled(); + OplogUpdateEntryArgs entryArgs(*args, ns(), _uuid); getGlobalServiceContext()->getOpObserver()->onUpdate(opCtx, entryArgs); } diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index b4b93f6baef..d8d52d1bc4c 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -53,6 +53,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer_util.h" #include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/read_write_concern_defaults.h" #include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/db/repl/oplog.h" @@ -286,6 +287,23 @@ void writeToImageCollection(OperationContext* opCtx, invariant(res.numDocsModified == 1 || !res.upsertedId.isEmpty()); } +// Inserts document pre-image 'preImage' into the change stream pre-images collection. +void writeToChangeStreamPreImagesCollection(OperationContext* opCtx, + const ChangeStreamPreImage& preImage) { + const auto collectionNamespace = NamespaceString::kChangeStreamPreImagesNamespace; + + // This lock acquisition can block on a stronger lock held by another operation modifying the + // pre-images collection. There are no known cases where an operation holding an exclusive lock + // on the pre-images collection also waits for oplog visibility. + repl::UnreplicatedWritesBlock unreplicated(opCtx); + AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); + AutoGetCollection preimagesCollectionRaii(opCtx, collectionNamespace, LockMode::MODE_IX); + UpdateResult res = Helpers::upsert(opCtx, collectionNamespace.toString(), preImage.toBSON()); + tassert(5868601, + "Failed to insert a new document into pre-images collection", + !res.existing && !res.upsertedId.isEmpty()); +} + } // namespace BSONObj OpObserverImpl::DocumentKey::getId() const { @@ -655,6 +673,16 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg dataImage); } + if (opCtx->isEnforcingConstraints() && + args.updateArgs.changeStreamPreAndPostImagesEnabledForCollection) { + const auto& preImageDoc = args.updateArgs.preImageDoc; + tassert(5868600, "PreImage must be set", preImageDoc && !preImageDoc.get().isEmpty()); + + ChangeStreamPreImageId _id(args.uuid, opTime.writeOpTime.getTimestamp(), 0); + ChangeStreamPreImage preImage(_id, opTime.wallClockTime, preImageDoc.get()); + writeToChangeStreamPreImagesCollection(opCtx, preImage); + } + SessionTxnRecord sessionTxnRecord; sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime); sessionTxnRecord.setLastWriteDate(opTime.wallClockTime); diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 1cc220d3746..150a8f4313f 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -43,6 +43,7 @@ #include "mongo/db/logical_time_validator.h" #include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" +#include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/read_write_concern_defaults.h" #include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h" #include "mongo/db/repl/image_collection_entry_gen.h" @@ -1637,12 +1638,15 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { {StoreDocOption::PostImage, kRecordPreImages, RetryableOptions::WithOplog, 3}, {StoreDocOption::PostImage, kRecordPreImages, RetryableOptions::WithSideCollection, 2}}; - for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) { - const auto& testCase = cases[testIdx]; + const auto testFunc = [&](CollectionUpdateArgs& updateArgs, + const UpdateTestCase& testCase, + const int testIdx) { LOGV2(5739902, "UpdateTestCase", "ImageType"_attr = testCase.getImageTypeStr(), - "AlwaysRecordPreImages"_attr = testCase.alwaysRecordPreImages, + "PreImageRecording"_attr = updateArgs.preImageRecordingEnabledForCollection, + "ChangeStreamPreAndPostImagesEnabled"_attr = + updateArgs.changeStreamPreAndPostImagesEnabledForCollection, "RetryableOptions"_attr = testCase.getRetryableOptionsStr(), "ExpectedOplogEntries"_attr = testCase.numOutputOplogs); @@ -1653,7 +1657,6 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { boost::optional<MongoDOperationContextSession> contextSession; boost::optional<TransactionParticipant::Participant> txnParticipant; - CollectionUpdateArgs updateArgs; switch (testCase.retryableOptions) { case RetryableOptions::NotRetryable: updateArgs.stmtIds = {kUninitializedStmtId}; @@ -1690,7 +1693,6 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { BSON("$set" << BSON("postImage" << true) << "$unset" << BSON("preImage" << 1)); updateArgs.criteria = BSON("_id" << 0); updateArgs.storeDocOption = testCase.imageType; - updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; OplogUpdateEntryArgs update(std::move(updateArgs), nss, uuid); // Phase 2: Call the code we're testing. @@ -1706,7 +1708,7 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { // Entries are returned in ascending timestamp order. const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back())); - const bool checkPreImageInOplog = testCase.alwaysRecordPreImages || + const bool checkPreImageInOplog = update.updateArgs.preImageRecordingEnabledForCollection || (testCase.imageType == StoreDocOption::PreImage && testCase.retryableOptions == RetryableOptions::WithOplog); if (checkPreImageInOplog) { @@ -1729,11 +1731,11 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { bool checkSideCollection = testCase.imageType != StoreDocOption::None && testCase.retryableOptions == RetryableOptions::WithSideCollection; - if (checkSideCollection && testCase.alwaysRecordPreImages && + if (checkSideCollection && update.updateArgs.preImageRecordingEnabledForCollection && testCase.imageType == StoreDocOption::PreImage) { - // When `alwaysRecordPreImages` is enabled for a collection, we always store an image in - // the oplog. To avoid unnecessary writes, we won't also store an image in the side - // collection. + // When `alwaysRecordPreImages` is enabled for a collection, we always store an + // image in the oplog. To avoid unnecessary writes, we won't also store an image + // in the side collection. checkSideCollection = false; } @@ -1750,6 +1752,50 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPostImage); } } + + if (update.updateArgs.changeStreamPreAndPostImagesEnabledForCollection) { + const Timestamp preImageOpTime = actualOp.getOpTime().getTimestamp(); + ChangeStreamPreImageId preImageId(uuid, preImageOpTime, 0); + AutoGetCollection preImagesCollection( + opCtx, NamespaceString::kChangeStreamPreImagesNamespace, LockMode::MODE_IS); + const auto preImage = Helpers::findOneForTesting( + opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON())); + const auto changeStreamPreImage = + ChangeStreamPreImage::parse(IDLParserErrorContext("pre-image"), preImage); + const BSONObj& expectedImage = update.updateArgs.preImageDoc.get(); + ASSERT_BSONOBJ_EQ(expectedImage, changeStreamPreImage.getPreImage()); + ASSERT_EQ(actualOp.getWallClockTime(), changeStreamPreImage.getOperationTime()); + } + }; + + for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) { + auto& testCase = cases[testIdx]; + + // In case when 'alwaysRecordPreImages' is set to true, run the test for both + // 'preImageRecordingEnabledForCollection' and + // 'changeStreamPreAndPostImagesEnabledForCollection' cases. + CollectionUpdateArgs updateArgs; + if (testCase.alwaysRecordPreImages) { + updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; + updateArgs.changeStreamPreAndPostImagesEnabledForCollection = + !testCase.alwaysRecordPreImages; + testFunc(updateArgs, testCase, testIdx); + + const auto numOutputOplogs = (testCase.imageType == StoreDocOption::PreImage && + testCase.retryableOptions == RetryableOptions::WithOplog) + ? testCase.numOutputOplogs + : testCase.numOutputOplogs - 1; + updateArgs.preImageRecordingEnabledForCollection = !testCase.alwaysRecordPreImages; + updateArgs.changeStreamPreAndPostImagesEnabledForCollection = + testCase.alwaysRecordPreImages; + testCase.numOutputOplogs = numOutputOplogs; + testFunc(updateArgs, testCase, testIdx); + } else { + updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; + updateArgs.changeStreamPreAndPostImagesEnabledForCollection = + testCase.alwaysRecordPreImages; + testFunc(updateArgs, testCase, testIdx); + } } } diff --git a/src/mongo/db/pipeline/change_stream_preimage.idl b/src/mongo/db/pipeline/change_stream_preimage.idl index c60a285e103..2a34bcc0512 100644 --- a/src/mongo/db/pipeline/change_stream_preimage.idl +++ b/src/mongo/db/pipeline/change_stream_preimage.idl @@ -59,6 +59,6 @@ structs: description: Operation execution wall clock time. Used to determine if the pre-image expired. type: date - preimage: + preImage: description: Pre-image of a document for an operation recorded to an oplog entry. type: object |