From e3b5c6069197d478929449eec87f8bfcc58bc7bd Mon Sep 17 00:00:00 2001 From: Cheahuychou Mao Date: Tue, 26 Oct 2021 04:04:34 +0000 Subject: SERVER-60540 Add retryability support for internal transactions for findAndModify --- src/mongo/db/op_observer_impl_test.cpp | 946 +++++++++++++++++++++++---------- 1 file changed, 652 insertions(+), 294 deletions(-) (limited to 'src/mongo/db/op_observer_impl_test.cpp') diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 740f24808a5..fbfd560f674 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -46,6 +46,7 @@ #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/read_write_concern_defaults.h" #include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h" +#include "mongo/db/repl/apply_ops_command_info.h" #include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" @@ -71,6 +72,72 @@ namespace { using repl::OplogEntry; using unittest::assertGet; +namespace { + +OplogEntry getInnerEntryFromApplyOpsOplogEntry(const OplogEntry& oplogEntry) { + std::vector innerEntries; + ASSERT(oplogEntry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + repl::ApplyOps::extractOperationsTo(oplogEntry, oplogEntry.getEntry().toBSON(), &innerEntries); + ASSERT_EQ(innerEntries.size(), 1u); + return innerEntries[0]; +} + +void beginRetryableWriteWithTxnNumber( + OperationContext* opCtx, + TxnNumber txnNumber, + std::unique_ptr& contextSession) { + opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + opCtx->setTxnNumber(txnNumber); + + contextSession = std::make_unique(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue(opCtx, + {*opCtx->getTxnNumber()}, + boost::none /* autocommit */, + boost::none /* startTransaction */); +}; + +void beginNonRetryableTransactionWithTxnNumber( + OperationContext* opCtx, + TxnNumber txnNumber, + std::unique_ptr& contextSession) { + opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + opCtx->setTxnNumber(txnNumber); + opCtx->setInMultiDocumentTransaction(); + + contextSession = std::make_unique(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue( + opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); +}; + +void beginRetryableInternalTransactionWithTxnNumber( + OperationContext* opCtx, + TxnNumber txnNumber, + std::unique_ptr& contextSession) { + RAIIServerParameterControllerForTest controller{"featureFlagInternalTransactions", true}; + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + + opCtx->setLogicalSessionId(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); + opCtx->setTxnNumber(txnNumber); + opCtx->setInMultiDocumentTransaction(); + + contextSession = std::make_unique(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue( + opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); +}; + +template +void commitUnpreparedTransaction(OperationContext* opCtx, OpObserverType& opObserver) { + auto txnParticipant = TransactionParticipant::get(opCtx); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx); + opObserver.onUnpreparedTransactionCommit( + opCtx, &txnOps, txnParticipant.getNumberOfPrePostImagesToWriteForTest()); +} + +} // namespace + class OpObserverTest : public ServiceContextMongoDTest { public: void setUp() override { @@ -95,6 +162,10 @@ public: ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn()); } + void tearDown() override { + serverGlobalParams.clusterRole = ClusterRole::None; + } + void reset(OperationContext* opCtx, NamespaceString nss) const { writeConflictRetry(opCtx, "deleteAll", nss.ns(), [&] { opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); @@ -163,6 +234,11 @@ protected: return getNOplogEntries(opCtx, 1).back(); } + BSONObj getInnerEntryFromSingleApplyOpsOplogEntry(OperationContext* opCtx) { + auto applyOpsOplogEntry = assertGet(OplogEntry::parse(getNOplogEntries(opCtx, 1).back())); + return getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry).getEntry().toBSON(); + } + bool didWriteImageEntryToSideCollection(OperationContext* opCtx, const LogicalSessionId& sessionId) { AutoGetCollection sideCollection( @@ -739,17 +815,9 @@ public: void setUp() override { OpObserverTest::setUp(); _opCtx = cc().makeOperationContext(); - _opObserver.emplace(); - MongoDSessionCatalog::onStepUp(opCtx()); _times.emplace(opCtx()); - - opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); - opCtx()->setTxnNumber(txnNum()); - opCtx()->setInMultiDocumentTransaction(); - _sessionCheckout = std::make_unique(opCtx()); - _txnParticipant.emplace(TransactionParticipant::get(opCtx())); } void tearDown() override { @@ -760,6 +828,20 @@ public: OpObserverTest::tearDown(); } + void setUpRetryableWrite() { + beginRetryableWriteWithTxnNumber(opCtx(), txnNum(), _sessionCheckout); + _txnParticipant.emplace(TransactionParticipant::get(opCtx())); + } + + void setUpNonRetryableTransaction() { + beginNonRetryableTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout); + _txnParticipant.emplace(TransactionParticipant::get(opCtx())); + } + + void setUpRetryableInternalTransaction() { + beginRetryableInternalTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout); + _txnParticipant.emplace(TransactionParticipant::get(opCtx())); + } protected: Session* session() { @@ -806,10 +888,7 @@ class OpObserverTransactionTest : public OpObserverTxnParticipantTest { public: void setUp() override { OpObserverTxnParticipantTest::setUp(); - txnParticipant().beginOrContinue(opCtx(), - {*opCtx()->getTxnNumber()}, - false /* autocommit */, - true /* startTransaction */); + OpObserverTxnParticipantTest::setUpNonRetryableTransaction(); } protected: @@ -1497,110 +1576,215 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) { * Test fixture for testing OpObserver behavior specific to retryable findAndModify. */ class OpObserverRetryableFindAndModifyTest : public OpObserverTxnParticipantTest { +public: + void tearDown() override { + OpObserverTxnParticipantTest::tearDown(); + } + +protected: + void testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage() { + NamespaceString nss = {"test", "coll"}; + const auto uuid = CollectionUUID::gen(); + + CollectionUpdateArgs updateArgs; + updateArgs.stmtIds = {0}; + updateArgs.updatedDoc = BSON("_id" << 0 << "data" + << "x"); + updateArgs.update = BSON("$set" << BSON("data" + << "x")); + updateArgs.criteria = BSON("_id" << 0); + updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage; + OplogUpdateEntryArgs update(&updateArgs, nss, uuid); + update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; + + WriteUnitOfWork wunit(opCtx()); + AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); + opObserver().onUpdate(opCtx(), update); + commit(); + + // Asserts that only a single oplog entry was created. In essence, we did not create any + // no-op image entries in the oplog. + const auto oplogEntry = assertGetSingleOplogEntry(); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); + ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); + ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), + "postImage"_sd); + } + + void testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage() { + NamespaceString nss = {"test", "coll"}; + const auto uuid = CollectionUUID::gen(); + + CollectionUpdateArgs updateArgs; + updateArgs.stmtIds = {0}; + updateArgs.preImageDoc = BSON("_id" << 0 << "data" + << "y"); + updateArgs.update = BSON("$set" << BSON("data" + << "x")); + updateArgs.criteria = BSON("_id" << 0); + updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage; + OplogUpdateEntryArgs update(&updateArgs, nss, uuid); + update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; + + WriteUnitOfWork wunit(opCtx()); + AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); + opObserver().onUpdate(opCtx(), update); + commit(); + + // Asserts that only a single oplog entry was created. In essence, we did not create any + // no-op image entries in the oplog. + const auto oplogEntry = assertGetSingleOplogEntry(); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); + ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); + ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), + "preImage"_sd); + } + + void testRetryableFindAndModifyDeleteHasNeedsRetryImage() { + NamespaceString nss = {"test", "coll"}; + const auto uuid = CollectionUUID::gen(); + + WriteUnitOfWork wunit(opCtx()); + AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); + const auto deletedDoc = BSON("_id" << 0 << "data" + << "x"); + opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc); + OplogDeleteEntryArgs args; + args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; + args.deletedDoc = &deletedDoc; + opObserver().onDelete(opCtx(), nss, uuid, 0, args); + commit(); + + // Asserts that only a single oplog entry was created. In essence, we did not create any + // no-op image entries in the oplog. + const auto oplogEntry = assertGetSingleOplogEntry(); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); + ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); + ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), + "preImage"_sd); + } + + virtual void commit() = 0; + + virtual BSONObj assertGetSingleOplogEntry() = 0; +}; + +class OpObserverRetryableFindAndModifyOutsideTransactionTest + : public OpObserverRetryableFindAndModifyTest { public: void setUp() override { OpObserverTxnParticipantTest::setUp(); - txnParticipant().beginOrContinue( - opCtx(), {txnNum()}, boost::none /* autocommit */, boost::none /* startTransaction */); + OpObserverTxnParticipantTest::setUpRetryableWrite(); } - void tearDown() override { - OpObserverTxnParticipantTest::tearDown(); +protected: + void commit() final{}; + + BSONObj assertGetSingleOplogEntry() final { + return getSingleOplogEntry(opCtx()); } }; -TEST_F(OpObserverRetryableFindAndModifyTest, +TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest, RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) { - NamespaceString nss = {"test", "coll"}; - const auto uuid = CollectionUUID::gen(); + testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage(); +} - CollectionUpdateArgs updateArgs; - updateArgs.stmtIds = {0}; - updateArgs.updatedDoc = BSON("_id" << 0 << "data" - << "x"); - updateArgs.update = BSON("$set" << BSON("data" - << "x")); - updateArgs.criteria = BSON("_id" << 0); - updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage; - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); - update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - - WriteUnitOfWork wunit(opCtx()); - AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); - opObserver().onUpdate(opCtx(), update); - // Asserts that only a single oplog entry was created. In essence, we did not create any - // no-op image entries in the oplog. - const auto oplogEntry = getSingleOplogEntry(opCtx()); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); - ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); - ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), - "postImage"_sd); -} - -TEST_F(OpObserverRetryableFindAndModifyTest, +TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest, RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) { - NamespaceString nss = {"test", "coll"}; - const auto uuid = CollectionUUID::gen(); + testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage(); +} - CollectionUpdateArgs updateArgs; - updateArgs.stmtIds = {0}; - updateArgs.preImageDoc = BSON("_id" << 0 << "data" - << "y"); - updateArgs.update = BSON("$set" << BSON("data" - << "x")); - updateArgs.criteria = BSON("_id" << 0); - updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage; - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); - update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - - WriteUnitOfWork wunit(opCtx()); - AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); - opObserver().onUpdate(opCtx(), update); - // Asserts that only a single oplog entry was created. In essence, we did not create any - // no-op image entries in the oplog. - const auto oplogEntry = getSingleOplogEntry(opCtx()); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); - ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); - ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), - "preImage"_sd); -} - -TEST_F(OpObserverRetryableFindAndModifyTest, RetryableFindAndModifyDeleteHasNeedsRetryImage) { - NamespaceString nss = {"test", "coll"}; - const auto uuid = CollectionUUID::gen(); +TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest, + RetryableFindAndModifyDeleteHasNeedsRetryImage) { + testRetryableFindAndModifyDeleteHasNeedsRetryImage(); +} - WriteUnitOfWork wunit(opCtx()); - AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); - const auto deletedDoc = BSON("_id" << 0 << "data" - << "x"); - opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc); - OplogDeleteEntryArgs args; - args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - args.deletedDoc = &deletedDoc; - opObserver().onDelete(opCtx(), nss, uuid, 0, args); - // Asserts that only a single oplog entry was created. In essence, we did not create any - // no-op image entries in the oplog. - const auto oplogEntry = getSingleOplogEntry(opCtx()); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); - ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); - ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), - "preImage"_sd); -} - -OplogEntry findByTimestamp(const std::vector& oplogs, Timestamp ts) { +class OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest + : public OpObserverRetryableFindAndModifyTest { +public: + void setUp() override { + OpObserverTxnParticipantTest::setUp(); + OpObserverTxnParticipantTest::setUpRetryableInternalTransaction(); + } + +protected: + void commit() final { + commitUnpreparedTransaction(opCtx(), opObserver()); + }; + + BSONObj assertGetSingleOplogEntry() final { + return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx()); + } +}; + +TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest, + RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) { + testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage(); +} + +TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest, + RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) { + testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage(); +} + +TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest, + RetryableFindAndModifyDeleteHasNeedsRetryImage) { + testRetryableFindAndModifyDeleteHasNeedsRetryImage(); +} + +class OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest + : public OpObserverRetryableFindAndModifyTest { +public: + void setUp() override { + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + OpObserverTxnParticipantTest::setUp(); + OpObserverTxnParticipantTest::setUpRetryableInternalTransaction(); + } + +protected: + void commit() final { + const auto prepareSlot = repl::getNextOpTime(opCtx()); + txnParticipant().transitionToPreparedforTest(opCtx(), prepareSlot); + auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare( + opCtx(), + {prepareSlot}, + &txnOps, + txnParticipant().getNumberOfPrePostImagesToWriteForTest()); + }; + + BSONObj assertGetSingleOplogEntry() final { + return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx()); + } +}; + +TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest, + RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) { + testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage(); +} + +TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest, + RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) { + testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage(); +} + +TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest, + RetryableFindAndModifyDeleteHasNeedsRetryImage) { + testRetryableFindAndModifyDeleteHasNeedsRetryImage(); +} + +boost::optional findByTimestamp(const std::vector& oplogs, Timestamp ts) { for (auto& oplog : oplogs) { const auto& entry = assertGet(OplogEntry::parse(oplog)); if (entry.getTimestamp() == ts) { return entry; } } - - FAIL("Not found."); - // C++/clang isn't smart enough to know FAIL is guaranteed to throw. - MONGO_UNREACHABLE; + return boost::none; } using StoreDocOption = CollectionUpdateArgs::StoreDocOption; @@ -1619,6 +1803,8 @@ const bool kChangeStreamImagesDisabled = false; const auto kNotRetryable = RetryableFindAndModifyLocation::kNone; const auto kRecordInOplog = RetryableFindAndModifyLocation::kOplog; const auto kRecordInSideCollection = RetryableFindAndModifyLocation::kSideCollection; + +const std::vector kInMultiDocumentTransactionCases{false, true}; } // namespace struct UpdateTestCase { @@ -1662,48 +1848,9 @@ struct UpdateTestCase { } }; -TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { - // Create a registry that only registers the Impl. It can be challenging to call methods on the - // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due - // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf. - OpObserverRegistry opObserver; - opObserver.addObserver(std::make_unique()); - - NamespaceString nss("test", "coll"); - CollectionUUID uuid = CollectionUUID::gen(); - - std::vector cases = { - // Regular updates. - {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, - {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, - {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1}, - {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, - {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, - {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}, - // FindAndModify asking for a preImage. - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, - {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, - {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}, - // FindAndModify asking for a postImage. - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, - {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, - {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3}, - {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}}; - - for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) { - const auto& testCase = cases[testIdx]; +class OnUpdateOutputsTest : public OpObserverTest { +protected: + void logTestCase(const UpdateTestCase& testCase) { LOGV2(5739902, "UpdateTestCase", "ImageType"_attr = testCase.getImageTypeStr(), @@ -1712,98 +1859,87 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { "RetryableFindAndModifyLocation"_attr = testCase.getRetryableFindAndModifyLocationStr(), "ExpectedOplogEntries"_attr = testCase.numOutputOplogs); + } - CollectionUpdateArgs updateArgs; - updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; - updateArgs.changeStreamPreAndPostImagesEnabledForCollection = + void initializeOplogUpdateEntryArgs(OperationContext* opCtx, + const UpdateTestCase& testCase, + OplogUpdateEntryArgs* update) { + update->updateArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; + update->updateArgs->changeStreamPreAndPostImagesEnabledForCollection = testCase.changeStreamImagesEnabled; - auto opCtxRaii = cc().makeOperationContext(); - OperationContext* opCtx = opCtxRaii.get(); - // Phase 1: Clearing any state and setting up fixtures/the update call. - resetOplogAndTransactions(opCtx); - - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); - boost::optional contextSession; - boost::optional txnParticipant; switch (testCase.retryableOptions) { case kNotRetryable: - updateArgs.stmtIds = {kUninitializedStmtId}; + update->updateArgs->stmtIds = {kUninitializedStmtId}; break; case kRecordInOplog: - update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog; - updateArgs.stmtIds = {1}; + update->retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog; + update->updateArgs->stmtIds = {1}; break; case kRecordInSideCollection: - update.retryableFindAndModifyLocation = + update->retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - updateArgs.stmtIds = {1}; - if (testCase.alwaysRecordPreImages && - testCase.retryableOptions == kRecordInSideCollection) { + update->updateArgs->stmtIds = {1}; + if (testCase.retryableOptions == kRecordInSideCollection) { // 'getNextOpTimes' requires us to be inside a WUOW when reserving oplog slots. WriteUnitOfWork wuow(opCtx); auto reservedSlots = repl::getNextOpTimes(opCtx, 3); - updateArgs.oplogSlots = reservedSlots; + update->updateArgs->oplogSlots = reservedSlots; } break; } - if (testCase.retryableOptions != kNotRetryable) { - opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); - opCtx->setTxnNumber(TxnNumber(testIdx)); - contextSession.emplace(opCtx); - txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, - {TxnNumber(testIdx)}, - boost::none /* autocommit */, - boost::none /* startTransaction */); - } - - updateArgs.preImageDoc = boost::none; + update->updateArgs->preImageDoc = boost::none; if (testCase.imageType == StoreDocOption::PreImage || testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) { - updateArgs.preImageDoc = BSON("_id" << 0 << "preImage" << true); + update->updateArgs->preImageDoc = BSON("_id" << 0 << "preImage" << true); } - - updateArgs.updatedDoc = BSON("_id" << 0 << "postImage" << true); - updateArgs.update = + update->updateArgs->updatedDoc = BSON("_id" << 0 << "postImage" << true); + update->updateArgs->update = BSON("$set" << BSON("postImage" << true) << "$unset" << BSON("preImage" << 1)); - updateArgs.criteria = BSON("_id" << 0); - updateArgs.storeDocOption = testCase.imageType; - - // Phase 2: Call the code we're testing. - WriteUnitOfWork wuow(opCtx); - AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX); - opObserver.onUpdate(opCtx, update); - wuow.commit(); - - // Phase 3: Analyze the results: - - // This `getNOplogEntries` also asserts that all oplogs are retrieved. - std::vector oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); - // Entries are returned in ascending timestamp order. - const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back())); + update->updateArgs->criteria = BSON("_id" << 0); + update->updateArgs->storeDocOption = testCase.imageType; + } + void checkPreImageInOplogIfNeeded(const UpdateTestCase& testCase, + const OplogUpdateEntryArgs& update, + const std::vector& oplogs, + const OplogEntry& updateOplogEntry) { const bool checkPreImageInOplog = testCase.alwaysRecordPreImages || (testCase.imageType == StoreDocOption::PreImage && testCase.retryableOptions == kRecordInOplog); if (checkPreImageInOplog) { - ASSERT(actualOp.getPreImageOpTime()); - const Timestamp preImageOpTime = actualOp.getPreImageOpTime()->getTimestamp(); + ASSERT(updateOplogEntry.getPreImageOpTime()); + const Timestamp preImageOpTime = updateOplogEntry.getPreImageOpTime()->getTimestamp(); ASSERT_FALSE(preImageOpTime.isNull()); - OplogEntry preImage = findByTimestamp(oplogs, preImageOpTime); + OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime); ASSERT_BSONOBJ_EQ(update.updateArgs->preImageDoc.get(), preImage.getObject()); + } else { + ASSERT_FALSE(updateOplogEntry.getPreImageOpTime()); } + } + void checkPostImageInOplogIfNeeded(const UpdateTestCase& testCase, + const OplogUpdateEntryArgs& update, + const std::vector& oplogs, + const OplogEntry& updateOplogEntry) { const bool checkPostImageInOplog = testCase.imageType == StoreDocOption::PostImage && testCase.retryableOptions == kRecordInOplog; if (checkPostImageInOplog) { - ASSERT(actualOp.getPostImageOpTime()); - const Timestamp postImageOpTime = actualOp.getPostImageOpTime()->getTimestamp(); + ASSERT(updateOplogEntry.getPostImageOpTime()); + const Timestamp postImageOpTime = updateOplogEntry.getPostImageOpTime()->getTimestamp(); ASSERT_FALSE(postImageOpTime.isNull()); - OplogEntry postImage = findByTimestamp(oplogs, postImageOpTime); + OplogEntry postImage = *findByTimestamp(oplogs, postImageOpTime); ASSERT_BSONOBJ_EQ(update.updateArgs->updatedDoc, postImage.getObject()); + } else { + ASSERT_FALSE(updateOplogEntry.getPostImageOpTime()); } + } + void checkSideCollectionIfNeeded(OperationContext* opCtx, + const UpdateTestCase& testCase, + const OplogUpdateEntryArgs& update, + const std::vector& oplogs, + const OplogEntry& updateOplogEntry) { bool checkSideCollection = testCase.isFindAndModify() && testCase.retryableOptions == kRecordInSideCollection; if (checkSideCollection && testCase.alwaysRecordPreImages && @@ -1813,32 +1949,180 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { // in the side collection. checkSideCollection = false; } - if (checkSideCollection) { repl::ImageEntry imageEntry = - getImageEntryFromSideCollection(opCtx, *actualOp.getSessionId()); + getImageEntryFromSideCollection(opCtx, *updateOplogEntry.getSessionId()); const BSONObj& expectedImage = testCase.imageType == StoreDocOption::PreImage ? update.updateArgs->preImageDoc.get() : update.updateArgs->updatedDoc; ASSERT_BSONOBJ_EQ(expectedImage, imageEntry.getImage()); + ASSERT(imageEntry.getImageKind() == updateOplogEntry.getNeedsRetryImage()); if (testCase.imageType == StoreDocOption::PreImage) { ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage); } else { ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPostImage); } + + // If 'updateOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged + // noop oplog entry for the pre/postImage written to the side collection. + const Timestamp forgeNoopTimestamp = updateOplogEntry.getTimestamp() - 1; + ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp)); + } else { + ASSERT_FALSE(updateOplogEntry.getNeedsRetryImage()); + if (updateOplogEntry.getSessionId()) { + ASSERT_FALSE( + didWriteImageEntryToSideCollection(opCtx, *updateOplogEntry.getSessionId())); + } else { + // Session id is missing only for non-retryable option. + ASSERT(testCase.retryableOptions == kNotRetryable); + } } + } + void checkChangeStreamImagesIfNeeded(OperationContext* opCtx, + const UpdateTestCase& testCase, + const OplogUpdateEntryArgs& update, + const OplogEntry& updateOplogEntry) { if (testCase.changeStreamImagesEnabled) { BSONObj container; - ChangeStreamPreImageId preImageId(uuid, actualOp.getOpTime().getTimestamp(), 0); + ChangeStreamPreImageId preImageId( + _uuid, updateOplogEntry.getOpTime().getTimestamp(), 0); ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container); const BSONObj& expectedImage = update.updateArgs->preImageDoc.get(); ASSERT_BSONOBJ_EQ(expectedImage, preImage.getPreImage()); - ASSERT_EQ(actualOp.getWallClockTime(), preImage.getOperationTime()); + ASSERT_EQ(updateOplogEntry.getWallClockTime(), preImage.getOperationTime()); + } + } + + std::vector _cases = { + // Regular updates. + {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, + {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, + {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1}, + {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, + {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, + {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}, + // FindAndModify asking for a preImage. + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, + {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, + {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}, + // FindAndModify asking for a postImage. + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, + {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, + {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3}, + {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}}; + + const NamespaceString _nss{"test", "coll"}; + const CollectionUUID _uuid = CollectionUUID::gen(); +}; + +TEST_F(OnUpdateOutputsTest, TestNonTransactionFundamentalOnUpdateOutputs) { + // Create a registry that only registers the Impl. It can be challenging to call methods on + // the Impl directly. It falls into cases where `ReservedTimes` is expected to be + // instantiated. Due to strong encapsulation, we use the registry that managers the + // `ReservedTimes` on our behalf. + OpObserverRegistry opObserver; + opObserver.addObserver(std::make_unique()); + + for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) { + const auto& testCase = _cases[testIdx]; + logTestCase(testCase); + + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + + // Phase 1: Clearing any state and setting up fixtures/the update call. + resetOplogAndTransactions(opCtx); + + std::unique_ptr contextSession; + if (testCase.isRetryable()) { + beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession); } + + // Phase 2: Call the code we're testing. + CollectionUpdateArgs updateArgs; + OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid); + initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs); + + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + opObserver.onUpdate(opCtx, updateEntryArgs); + wuow.commit(); + + // Phase 3: Analyze the results: + // This `getNOplogEntries` also asserts that all oplogs are retrieved. + std::vector oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); + // Entries are returned in ascending timestamp order. + auto updateOplogEntry = assertGet(OplogEntry::parse(oplogs.back())); + checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkChangeStreamImagesIfNeeded(opCtx, testCase, updateEntryArgs, updateOplogEntry); } } +TEST_F(OnUpdateOutputsTest, TestFundamentalTransactionOnUpdateOutputs) { + // Create a registry that only registers the Impl. It can be challenging to call methods on + // the Impl directly. It falls into cases where `ReservedTimes` is expected to be + // instantiated. Due to strong encapsulation, we use the registry that managers the + // `ReservedTimes` on our behalf. + OpObserverRegistry opObserver; + opObserver.addObserver(std::make_unique()); + + for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) { + const auto& testCase = _cases[testIdx]; + if (testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) { + continue; + } + logTestCase(testCase); + + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + + // Phase 1: Clearing any state and setting up fixtures/the update call. + resetOplogAndTransactions(opCtx); + + std::unique_ptr contextSession; + if (testCase.isRetryable()) { + beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession); + } else { + beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession); + } + + // Phase 2: Call the code we're testing. + CollectionUpdateArgs updateArgs; + OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid); + initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs); + + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + opObserver.onUpdate(opCtx, updateEntryArgs); + commitUnpreparedTransaction(opCtx, opObserver); + wuow.commit(); + + // Phase 3: Analyze the results: + // This `getNOplogEntries` also asserts that all oplogs are retrieved. + std::vector oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); + // Entries are returned in ascending timestamp order. + auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back())); + auto updateOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry); + checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry); + } +} struct InsertTestCase { bool isRetryableWrite; @@ -1882,17 +2166,9 @@ TEST_F(OpObserverTest, TestFundamentalOnInsertsOutputs) { toInsert.emplace_back(stmtId, BSON("_id" << stmtIdx)); } - boost::optional contextSession; - boost::optional txnParticipant; + std::unique_ptr contextSession; if (testCase.isRetryableWrite) { - opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); - opCtx->setTxnNumber(TxnNumber(testIdx)); - contextSession.emplace(opCtx); - txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, - {TxnNumber(testIdx)}, - boost::none /* autocommit */, - boost::none /* startTransaction */); + beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession); } // Phase 2: Call the code we're testing. @@ -1971,31 +2247,10 @@ struct DeleteTestCase { } }; -TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) { - // Create a registry that only registers the Impl. It can be challenging to call methods on the - // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due - // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf. - OpObserverRegistry opObserver; - opObserver.addObserver(std::make_unique()); - - NamespaceString nss("test", "coll"); - CollectionUUID uuid = CollectionUUID::gen(); - - // For the DeleteTestCase, we add a "pre-image" deletedDoc when using `kRecordInOplog` and - // `kRecordInSideCollection`. - std::vector cases{ - {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, - {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, - {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, - {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, - {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, - {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, - {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}}; +class OnDeleteOutputsTest : public OpObserverTest { - for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) { - const auto& testCase = cases[testIdx]; +protected: + void logTestCase(const DeleteTestCase& testCase) { LOGV2(5739905, "DeleteTestCase", "PreImageRecording"_attr = testCase.alwaysRecordPreImages, @@ -2003,111 +2258,214 @@ TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) { "RetryableFindAndModifyLocation"_attr = testCase.getRetryableFindAndModifyLocationStr(), "ExpectedOplogEntries"_attr = testCase.numOutputOplogs); + } - OplogDeleteEntryArgs deleteArgs; - deleteArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; - deleteArgs.changeStreamPreAndPostImagesEnabledForCollection = + void initializeOplogDeleteEntryArgs(OperationContext* opCtx, + const DeleteTestCase& testCase, + OplogDeleteEntryArgs* deleteArgs) { + deleteArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; + deleteArgs->changeStreamPreAndPostImagesEnabledForCollection = testCase.changeStreamImagesEnabled; - auto opCtxRaii = cc().makeOperationContext(); - OperationContext* opCtx = opCtxRaii.get(); - // Phase 1: Clearing any state and setting up fixtures/the update call. - resetOplogAndTransactions(opCtx); - - boost::optional contextSession; - boost::optional txnParticipant; - if (testCase.retryableOptions != kNotRetryable) { - opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); - opCtx->setTxnNumber(TxnNumber(testIdx)); - contextSession.emplace(opCtx); - txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, - {TxnNumber(testIdx)}, - boost::none /* autocommit */, - boost::none /* startTransaction */); - } switch (testCase.retryableOptions) { case kNotRetryable: - deleteArgs.retryableFindAndModifyLocation = kNotRetryable; + deleteArgs->retryableFindAndModifyLocation = kNotRetryable; break; case kRecordInOplog: - deleteArgs.retryableFindAndModifyLocation = kRecordInOplog; + deleteArgs->retryableFindAndModifyLocation = kRecordInOplog; break; case kRecordInSideCollection: - deleteArgs.retryableFindAndModifyLocation = kRecordInSideCollection; + deleteArgs->retryableFindAndModifyLocation = kRecordInSideCollection; break; } - - const BSONObj deletedDoc = BSON("_id" << 0 << "valuePriorToDelete" - << "marvelous"); if (testCase.isRetryable() || testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) { - deleteArgs.deletedDoc = &deletedDoc; + deleteArgs->deletedDoc = &_deletedDoc; } - // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect - // of setting of `documentKey` on the delete for sharding purposes. - // `OpObserverImpl::onDelete` asserts its existence. - documentKeyDecoration(opCtx).emplace(deletedDoc["_id"].wrap(), boost::none); - StmtId deleteStmtId = kUninitializedStmtId; - if (testCase.isRetryable()) { - deleteStmtId = {1}; - } - - // Phase 2: Call the code we're testing. - WriteUnitOfWork wuow(opCtx); - AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX); - opObserver.onDelete(opCtx, nss, uuid, deleteStmtId, deleteArgs); - wuow.commit(); - - // Phase 3: Analyze the results: - - // This `getNOplogEntries` also asserts that all oplogs are retrieved. - std::vector oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); - // Entries are returned in ascending timestamp order. - const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back())); + } + void checkPreImageInOplogIfNeeded(const DeleteTestCase& testCase, + const OplogDeleteEntryArgs& deleteArgs, + const std::vector& oplogs, + const OplogEntry& deleteOplogEntry) { const bool checkPreImageInOplog = deleteArgs.preImageRecordingEnabledForCollection || deleteArgs.retryableFindAndModifyLocation == kRecordInOplog; if (checkPreImageInOplog) { - ASSERT(actualOp.getPreImageOpTime()); - const Timestamp preImageOpTime = actualOp.getPreImageOpTime()->getTimestamp(); + ASSERT(deleteOplogEntry.getPreImageOpTime()); + const Timestamp preImageOpTime = deleteOplogEntry.getPreImageOpTime()->getTimestamp(); ASSERT_FALSE(preImageOpTime.isNull()); - OplogEntry preImage = findByTimestamp(oplogs, preImageOpTime); - ASSERT_BSONOBJ_EQ(deletedDoc, preImage.getObject()); + OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime); + ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getObject()); } else { - ASSERT_FALSE(actualOp.getPreImageOpTime()); + ASSERT_FALSE(deleteOplogEntry.getPreImageOpTime()); } + } + void checkSideCollectionIfNeeded(OperationContext* opCtx, + const DeleteTestCase& testCase, + const OplogDeleteEntryArgs& deleteArgs, + const std::vector& oplogs, + const OplogEntry& deleteOplogEntry) { bool didWriteInSideCollection = deleteArgs.retryableFindAndModifyLocation == kRecordInSideCollection && !deleteArgs.preImageRecordingEnabledForCollection; if (didWriteInSideCollection) { repl::ImageEntry imageEntry = - getImageEntryFromSideCollection(opCtx, *actualOp.getSessionId()); + getImageEntryFromSideCollection(opCtx, *deleteOplogEntry.getSessionId()); + ASSERT(imageEntry.getImageKind() == deleteOplogEntry.getNeedsRetryImage()); ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage); - ASSERT_BSONOBJ_EQ(deletedDoc, imageEntry.getImage()); + ASSERT_BSONOBJ_EQ(_deletedDoc, imageEntry.getImage()); + + // If 'deleteOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged + // noop oplog entry for the preImage written to the side collection. + const Timestamp forgeNoopTimestamp = deleteOplogEntry.getTimestamp() - 1; + ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp)); } else { - if (actualOp.getSessionId()) { - ASSERT_FALSE(didWriteImageEntryToSideCollection(opCtx, *actualOp.getSessionId())); + ASSERT_FALSE(deleteOplogEntry.getNeedsRetryImage()); + if (deleteOplogEntry.getSessionId()) { + ASSERT_FALSE( + didWriteImageEntryToSideCollection(opCtx, *deleteOplogEntry.getSessionId())); } else { // Session id is missing only for non-retryable option. ASSERT(testCase.retryableOptions == kNotRetryable); } } + } - const Timestamp preImageOpTime = actualOp.getOpTime().getTimestamp(); - ChangeStreamPreImageId preImageId(uuid, preImageOpTime, 0); + void checkChangeStreamImagesIfNeeded(OperationContext* opCtx, + const DeleteTestCase& testCase, + const OplogDeleteEntryArgs& deleteArgs, + const OplogEntry& deleteOplogEntry) { + const Timestamp preImageOpTime = deleteOplogEntry.getOpTime().getTimestamp(); + ChangeStreamPreImageId preImageId(_uuid, preImageOpTime, 0); if (deleteArgs.changeStreamPreAndPostImagesEnabledForCollection) { BSONObj container; ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container); - ASSERT_BSONOBJ_EQ(deletedDoc, preImage.getPreImage()); - ASSERT_EQ(actualOp.getWallClockTime(), preImage.getOperationTime()); + ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getPreImage()); + ASSERT_EQ(deleteOplogEntry.getWallClockTime(), preImage.getOperationTime()); } else { ASSERT_FALSE(didWriteDeletedDocToPreImagesCollection(opCtx, preImageId)); } } + + std::vector _cases{ + {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, + {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, + {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, + {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, + {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, + {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, + {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}}; + + const NamespaceString _nss{"test", "coll"}; + const CollectionUUID _uuid = CollectionUUID::gen(); + const BSONObj _deletedDoc = BSON("_id" << 0 << "valuePriorToDelete" + << "marvelous"); +}; + +TEST_F(OnDeleteOutputsTest, TestNonTransactionFundamentalOnDeleteOutputs) { + // Create a registry that only registers the Impl. It can be challenging to call methods on + // the Impl directly. It falls into cases where `ReservedTimes` is expected to be + // instantiated. Due to strong encapsulation, we use the registry that managers the + // `ReservedTimes` on our behalf. + OpObserverRegistry opObserver; + opObserver.addObserver(std::make_unique()); + + for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) { + const auto& testCase = _cases[testIdx]; + logTestCase(testCase); + + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + + // Phase 1: Clearing any state and setting up fixtures/the delete call. + resetOplogAndTransactions(opCtx); + + std::unique_ptr contextSession; + if (testCase.isRetryable()) { + beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession); + } + + // Phase 2: Call the code we're testing. + OplogDeleteEntryArgs deleteEntryArgs; + initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs); + + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect + // of setting of `documentKey` on the delete for sharding purposes. + // `OpObserverImpl::onDelete` asserts its existence. + documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none); + opObserver.onDelete( + opCtx, _nss, _uuid, testCase.isRetryable() ? 1 : kUninitializedStmtId, deleteEntryArgs); + wuow.commit(); + + // Phase 3: Analyze the results: + // This `getNOplogEntries` also asserts that all oplogs are retrieved. + std::vector oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); + // Entries are returned in ascending timestamp order. + auto deleteOplogEntry = assertGet(OplogEntry::parse(oplogs.back())); + checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry); + checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry); + checkChangeStreamImagesIfNeeded(opCtx, testCase, deleteEntryArgs, deleteOplogEntry); + } } +TEST_F(OnDeleteOutputsTest, TestTransactionFundamentalOnDeleteOutputs) { + // Create a registry that only registers the Impl. It can be challenging to call methods on + // the Impl directly. It falls into cases where `ReservedTimes` is expected to be + // instantiated. Due to strong encapsulation, we use the registry that managers the + // `ReservedTimes` on our behalf. + OpObserverRegistry opObserver; + opObserver.addObserver(std::make_unique()); + + for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) { + const auto& testCase = _cases[testIdx]; + if (testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) { + continue; + } + logTestCase(testCase); + + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + + // Phase 1: Clearing any state and setting up fixtures/the delete call. + resetOplogAndTransactions(opCtx); + + std::unique_ptr contextSession; + if (testCase.isRetryable()) { + beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession); + } else { + beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession); + } + + // Phase 2: Call the code we're testing. + OplogDeleteEntryArgs deleteEntryArgs; + initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs); + const auto stmtId = testCase.isRetryable() ? 1 : kUninitializedStmtId; + + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect + // of setting of `documentKey` on the delete for sharding purposes. + // `OpObserverImpl::onDelete` asserts its existence. + documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none); + opObserver.onDelete(opCtx, _nss, _uuid, stmtId, deleteEntryArgs); + commitUnpreparedTransaction(opCtx, opObserver); + wuow.commit(); + + // Phase 3: Analyze the results: + // This `getNOplogEntries` also asserts that all oplogs are retrieved. + std::vector oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); + // Entries are returned in ascending timestamp order. + auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back())); + auto deleteOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry); + checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry); + checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry); + } +} TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { const NamespaceString nss1("testDB", "testColl"); -- cgit v1.2.1