summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/write_pit_preimage.js67
-rw-r--r--jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js2
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/catalog/collection.h1
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp8
-rw-r--r--src/mongo/db/op_observer_impl.cpp28
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp66
-rw-r--r--src/mongo/db/pipeline/change_stream_preimage.idl2
8 files changed, 162 insertions, 13 deletions
diff --git a/jstests/change_streams/write_pit_preimage.js b/jstests/change_streams/write_pit_preimage.js
new file mode 100644
index 00000000000..5cb7090e300
--- /dev/null
+++ b/jstests/change_streams/write_pit_preimage.js
@@ -0,0 +1,67 @@
+// Tests that pre-images are stored in the pre-images collection on updates in collections with
+// 'changeStreamPreAndPostImages' set to true.
+// @tags: [
+// requires_fcv_51,
+// featureFlagChangeStreamPreAndPostImages,
+// assumes_against_mongod_not_mongos,
+// change_stream_does_not_expect_txns,
+// multiversion_incompatible,
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+load("jstests/libs/change_stream_util.js"); // For isChangeStreamPreAndPostImagesEnabled.
+
+const testDB = db.getSiblingDB(jsTestName());
+const localDB = db.getSiblingDB("local");
+const collName = "test";
+const coll =
+ assertDropAndRecreateCollection(testDB, collName, {changeStreamPreAndPostImages: true});
+const collInfos = testDB.getCollectionInfos({name: collName});
+assert.eq(collInfos.length, 1);
+const collUUID = collInfos[0].info.uuid;
+const preImagesColl = assertDropAndRecreateCollection(localDB, "system.preimages");
+const originalDoc = {
+ _id: 1,
+ x: 1
+};
+const updatedDoc = {
+ _id: 1,
+ x: 3
+};
+
+function assertValidPreImage(preImage) {
+ const MAX_TIME_DELTA_SECONDS = 300; // 5 minutes delay.
+ assert.eq(preImage._id.nsUUID, collUUID);
+ assert.lte(Math.abs(new Date().getTime() / 1000 - preImage._id.ts.getTime()),
+ MAX_TIME_DELTA_SECONDS);
+ assert.lte(Math.abs(new Date().getTime() / 1000 - preImage.operationTime.getTime() / 1000),
+ MAX_TIME_DELTA_SECONDS);
+ assert.eq(preImage._id.applyOpsIndex, 0);
+}
+
+// Perform an insert.
+assert.commandWorked(coll.insert(originalDoc));
+assert.eq(coll.find().count(), 1);
+
+// Pre-images collection should remain empty, as pre-images for insert operations can be found in
+// the oplog.
+assert.eq(preImagesColl.find().count(), 0);
+
+// Perform an update with 'damages'.
+assert.commandWorked(coll.update(originalDoc, {$inc: {x: 2}}));
+
+// Pre-images collection should contain one document with the 'originalDoc' pre-image.
+let preimages = preImagesColl.find({"preImage": originalDoc}).toArray();
+assert.eq(preimages.length, 1);
+assertValidPreImage(preimages[0]);
+
+// Perform an update (replace).
+assert.commandWorked(coll.update(updatedDoc, {z: 1}));
+
+// Pre-images collection should contain a new document with the 'updatedDoc' pre-image.
+preimages = preImagesColl.find({"preImage": updatedDoc}).toArray();
+assert.eq(preimages.length, 1);
+assertValidPreImage(preimages[0]);
+}());
diff --git a/jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js b/jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js
index 03ee3b144f1..beb31ecd39c 100644
--- a/jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js
+++ b/jstests/noPassthrough/change_streams_pre_and_post_images_in_create_and_collmod.js
@@ -44,7 +44,7 @@ function assertPreimagesCollectionExists() {
assert.eq(result.cursor.firstBatch[0].name, preimagesCollName);
}
-// Check that we cannot set 'changeStreamPreAndPostImages' on the local or admin databases.
+// Check that we cannot set 'changeStreamPreAndPostImages' on the local, admin and config databases.
for (const db of [localDB, adminDB, configDB]) {
assert.commandFailedWithCode(
db.runCommand({create: collName, changeStreamPreAndPostImages: true}),
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 0471048fd3b..f83373e8d84 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -851,6 +851,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/catalog/collection_catalog',
+ '$BUILD_DIR/mongo/db/pipeline/change_stream_preimage',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
'$BUILD_DIR/mongo/db/timeseries/bucket_catalog',
'$BUILD_DIR/mongo/s/coreshard',
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index 4cba32ed567..cd59dff9d10 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -95,6 +95,7 @@ struct CollectionUpdateArgs {
StoreDocOption storeDocOption = StoreDocOption::None;
bool preImageRecordingEnabledForCollection = false;
+ bool changeStreamPreAndPostImagesEnabledForCollection = false;
// Set if an OpTime was reserved for the update ahead of time.
boost::optional<OplogSlot> oplogSlot = boost::none;
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 2edd03b9308..ffdd6c5e893 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -1252,6 +1252,9 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx,
args->preImageDoc = oldDoc.value().getOwned();
}
args->preImageRecordingEnabledForCollection = getRecordPreImages();
+ args->changeStreamPreAndPostImagesEnabledForCollection =
+ isChangeStreamPreAndPostImagesEnabled();
+
const bool storePrePostImage =
args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None;
if (!args->oplogSlot && storePrePostImage) {
@@ -1315,7 +1318,7 @@ StatusWith<RecordData> CollectionImpl::updateDocumentWithDamages(
// For in-place updates we need to grab an owned copy of the pre-image doc if pre-image
// recording is enabled and we haven't already set the pre-image due to this update being
// a retryable findAndModify or a possible update to the shard key.
- if (!args->preImageDoc && getRecordPreImages()) {
+ if (!args->preImageDoc && (getRecordPreImages() || isChangeStreamPreAndPostImagesEnabled())) {
args->preImageDoc = oldRec.value().toBson().getOwned();
}
const bool storePrePostImage =
@@ -1337,6 +1340,9 @@ StatusWith<RecordData> CollectionImpl::updateDocumentWithDamages(
if (newRecStatus.isOK()) {
args->updatedDoc = newRecStatus.getValue().toBson();
args->preImageRecordingEnabledForCollection = getRecordPreImages();
+ args->changeStreamPreAndPostImagesEnabledForCollection =
+ isChangeStreamPreAndPostImagesEnabled();
+
OplogUpdateEntryArgs entryArgs(*args, ns(), _uuid);
getGlobalServiceContext()->getOpObserver()->onUpdate(opCtx, entryArgs);
}
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index b4b93f6baef..d8d52d1bc4c 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/op_observer_util.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/read_write_concern_defaults.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/oplog.h"
@@ -286,6 +287,23 @@ void writeToImageCollection(OperationContext* opCtx,
invariant(res.numDocsModified == 1 || !res.upsertedId.isEmpty());
}
+// Inserts document pre-image 'preImage' into the change stream pre-images collection.
+void writeToChangeStreamPreImagesCollection(OperationContext* opCtx,
+ const ChangeStreamPreImage& preImage) {
+ const auto collectionNamespace = NamespaceString::kChangeStreamPreImagesNamespace;
+
+ // This lock acquisition can block on a stronger lock held by another operation modifying the
+ // pre-images collection. There are no known cases where an operation holding an exclusive lock
+ // on the pre-images collection also waits for oplog visibility.
+ repl::UnreplicatedWritesBlock unreplicated(opCtx);
+ AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ AutoGetCollection preimagesCollectionRaii(opCtx, collectionNamespace, LockMode::MODE_IX);
+ UpdateResult res = Helpers::upsert(opCtx, collectionNamespace.toString(), preImage.toBSON());
+ tassert(5868601,
+ "Failed to insert a new document into pre-images collection",
+ !res.existing && !res.upsertedId.isEmpty());
+}
+
} // namespace
BSONObj OpObserverImpl::DocumentKey::getId() const {
@@ -655,6 +673,16 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
dataImage);
}
+ if (opCtx->isEnforcingConstraints() &&
+ args.updateArgs.changeStreamPreAndPostImagesEnabledForCollection) {
+ const auto& preImageDoc = args.updateArgs.preImageDoc;
+ tassert(5868600, "PreImage must be set", preImageDoc && !preImageDoc.get().isEmpty());
+
+ ChangeStreamPreImageId _id(args.uuid, opTime.writeOpTime.getTimestamp(), 0);
+ ChangeStreamPreImage preImage(_id, opTime.wallClockTime, preImageDoc.get());
+ writeToChangeStreamPreImagesCollection(opCtx, preImage);
+ }
+
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 1cc220d3746..150a8f4313f 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_registry.h"
+#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/read_write_concern_defaults.h"
#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
@@ -1637,12 +1638,15 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
{StoreDocOption::PostImage, kRecordPreImages, RetryableOptions::WithOplog, 3},
{StoreDocOption::PostImage, kRecordPreImages, RetryableOptions::WithSideCollection, 2}};
- for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) {
- const auto& testCase = cases[testIdx];
+ const auto testFunc = [&](CollectionUpdateArgs& updateArgs,
+ const UpdateTestCase& testCase,
+ const int testIdx) {
LOGV2(5739902,
"UpdateTestCase",
"ImageType"_attr = testCase.getImageTypeStr(),
- "AlwaysRecordPreImages"_attr = testCase.alwaysRecordPreImages,
+ "PreImageRecording"_attr = updateArgs.preImageRecordingEnabledForCollection,
+ "ChangeStreamPreAndPostImagesEnabled"_attr =
+ updateArgs.changeStreamPreAndPostImagesEnabledForCollection,
"RetryableOptions"_attr = testCase.getRetryableOptionsStr(),
"ExpectedOplogEntries"_attr = testCase.numOutputOplogs);
@@ -1653,7 +1657,6 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
boost::optional<MongoDOperationContextSession> contextSession;
boost::optional<TransactionParticipant::Participant> txnParticipant;
- CollectionUpdateArgs updateArgs;
switch (testCase.retryableOptions) {
case RetryableOptions::NotRetryable:
updateArgs.stmtIds = {kUninitializedStmtId};
@@ -1690,7 +1693,6 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
BSON("$set" << BSON("postImage" << true) << "$unset" << BSON("preImage" << 1));
updateArgs.criteria = BSON("_id" << 0);
updateArgs.storeDocOption = testCase.imageType;
- updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
OplogUpdateEntryArgs update(std::move(updateArgs), nss, uuid);
// Phase 2: Call the code we're testing.
@@ -1706,7 +1708,7 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
// Entries are returned in ascending timestamp order.
const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back()));
- const bool checkPreImageInOplog = testCase.alwaysRecordPreImages ||
+ const bool checkPreImageInOplog = update.updateArgs.preImageRecordingEnabledForCollection ||
(testCase.imageType == StoreDocOption::PreImage &&
testCase.retryableOptions == RetryableOptions::WithOplog);
if (checkPreImageInOplog) {
@@ -1729,11 +1731,11 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
bool checkSideCollection = testCase.imageType != StoreDocOption::None &&
testCase.retryableOptions == RetryableOptions::WithSideCollection;
- if (checkSideCollection && testCase.alwaysRecordPreImages &&
+ if (checkSideCollection && update.updateArgs.preImageRecordingEnabledForCollection &&
testCase.imageType == StoreDocOption::PreImage) {
- // When `alwaysRecordPreImages` is enabled for a collection, we always store an image in
- // the oplog. To avoid unnecessary writes, we won't also store an image in the side
- // collection.
+ // When `alwaysRecordPreImages` is enabled for a collection, we always store an
+ // image in the oplog. To avoid unnecessary writes, we won't also store an image
+ // in the side collection.
checkSideCollection = false;
}
@@ -1750,6 +1752,50 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPostImage);
}
}
+
+ if (update.updateArgs.changeStreamPreAndPostImagesEnabledForCollection) {
+ const Timestamp preImageOpTime = actualOp.getOpTime().getTimestamp();
+ ChangeStreamPreImageId preImageId(uuid, preImageOpTime, 0);
+ AutoGetCollection preImagesCollection(
+ opCtx, NamespaceString::kChangeStreamPreImagesNamespace, LockMode::MODE_IS);
+ const auto preImage = Helpers::findOneForTesting(
+ opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON()));
+ const auto changeStreamPreImage =
+ ChangeStreamPreImage::parse(IDLParserErrorContext("pre-image"), preImage);
+ const BSONObj& expectedImage = update.updateArgs.preImageDoc.get();
+ ASSERT_BSONOBJ_EQ(expectedImage, changeStreamPreImage.getPreImage());
+ ASSERT_EQ(actualOp.getWallClockTime(), changeStreamPreImage.getOperationTime());
+ }
+ };
+
+ for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) {
+ auto& testCase = cases[testIdx];
+
+ // In case when 'alwaysRecordPreImages' is set to true, run the test for both
+ // 'preImageRecordingEnabledForCollection' and
+ // 'changeStreamPreAndPostImagesEnabledForCollection' cases.
+ CollectionUpdateArgs updateArgs;
+ if (testCase.alwaysRecordPreImages) {
+ updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
+ updateArgs.changeStreamPreAndPostImagesEnabledForCollection =
+ !testCase.alwaysRecordPreImages;
+ testFunc(updateArgs, testCase, testIdx);
+
+ const auto numOutputOplogs = (testCase.imageType == StoreDocOption::PreImage &&
+ testCase.retryableOptions == RetryableOptions::WithOplog)
+ ? testCase.numOutputOplogs
+ : testCase.numOutputOplogs - 1;
+ updateArgs.preImageRecordingEnabledForCollection = !testCase.alwaysRecordPreImages;
+ updateArgs.changeStreamPreAndPostImagesEnabledForCollection =
+ testCase.alwaysRecordPreImages;
+ testCase.numOutputOplogs = numOutputOplogs;
+ testFunc(updateArgs, testCase, testIdx);
+ } else {
+ updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
+ updateArgs.changeStreamPreAndPostImagesEnabledForCollection =
+ testCase.alwaysRecordPreImages;
+ testFunc(updateArgs, testCase, testIdx);
+ }
}
}
diff --git a/src/mongo/db/pipeline/change_stream_preimage.idl b/src/mongo/db/pipeline/change_stream_preimage.idl
index c60a285e103..2a34bcc0512 100644
--- a/src/mongo/db/pipeline/change_stream_preimage.idl
+++ b/src/mongo/db/pipeline/change_stream_preimage.idl
@@ -59,6 +59,6 @@ structs:
description: Operation execution wall clock time. Used to determine if the
pre-image expired.
type: date
- preimage:
+ preImage:
description: Pre-image of a document for an operation recorded to an oplog entry.
type: object