summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl_test.cpp
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-10-26 04:04:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-20 04:49:23 +0000
commite3b5c6069197d478929449eec87f8bfcc58bc7bd (patch)
tree214e9c5d8f6ea7f431f0eb537f427e0e4040391c /src/mongo/db/op_observer_impl_test.cpp
parent95ba764c2c7a787e117536fe5632fe484f9178c8 (diff)
downloadmongo-e3b5c6069197d478929449eec87f8bfcc58bc7bd.tar.gz
SERVER-60540 Add retryability support for internal transactions for findAndModify
Diffstat (limited to 'src/mongo/db/op_observer_impl_test.cpp')
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp946
1 files changed, 652 insertions, 294 deletions
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 740f24808a5..fbfd560f674 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/read_write_concern_defaults.h"
#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h"
+#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
@@ -71,6 +72,72 @@ namespace {
using repl::OplogEntry;
using unittest::assertGet;
+namespace {
+
+OplogEntry getInnerEntryFromApplyOpsOplogEntry(const OplogEntry& oplogEntry) {
+ std::vector<repl::OplogEntry> innerEntries;
+ ASSERT(oplogEntry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ repl::ApplyOps::extractOperationsTo(oplogEntry, oplogEntry.getEntry().toBSON(), &innerEntries);
+ ASSERT_EQ(innerEntries.size(), 1u);
+ return innerEntries[0];
+}
+
+void beginRetryableWriteWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ opCtx->setTxnNumber(txnNumber);
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx,
+ {*opCtx->getTxnNumber()},
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+};
+
+void beginNonRetryableTransactionWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */);
+};
+
+void beginRetryableInternalTransactionWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ RAIIServerParameterControllerForTest controller{"featureFlagInternalTransactions", true};
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
+ opCtx->setLogicalSessionId(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */);
+};
+
+template <typename OpObserverType>
+void commitUnpreparedTransaction(OperationContext* opCtx, OpObserverType& opObserver) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx);
+ opObserver.onUnpreparedTransactionCommit(
+ opCtx, &txnOps, txnParticipant.getNumberOfPrePostImagesToWriteForTest());
+}
+
+} // namespace
+
class OpObserverTest : public ServiceContextMongoDTest {
public:
void setUp() override {
@@ -95,6 +162,10 @@ public:
ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
}
+ void tearDown() override {
+ serverGlobalParams.clusterRole = ClusterRole::None;
+ }
+
void reset(OperationContext* opCtx, NamespaceString nss) const {
writeConflictRetry(opCtx, "deleteAll", nss.ns(), [&] {
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
@@ -163,6 +234,11 @@ protected:
return getNOplogEntries(opCtx, 1).back();
}
+ BSONObj getInnerEntryFromSingleApplyOpsOplogEntry(OperationContext* opCtx) {
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(getNOplogEntries(opCtx, 1).back()));
+ return getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry).getEntry().toBSON();
+ }
+
bool didWriteImageEntryToSideCollection(OperationContext* opCtx,
const LogicalSessionId& sessionId) {
AutoGetCollection sideCollection(
@@ -739,17 +815,9 @@ public:
void setUp() override {
OpObserverTest::setUp();
_opCtx = cc().makeOperationContext();
-
_opObserver.emplace();
-
MongoDSessionCatalog::onStepUp(opCtx());
_times.emplace(opCtx());
-
- opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx()->setTxnNumber(txnNum());
- opCtx()->setInMultiDocumentTransaction();
- _sessionCheckout = std::make_unique<MongoDOperationContextSession>(opCtx());
- _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
}
void tearDown() override {
@@ -760,6 +828,20 @@ public:
OpObserverTest::tearDown();
}
+ void setUpRetryableWrite() {
+ beginRetryableWriteWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
+
+ void setUpNonRetryableTransaction() {
+ beginNonRetryableTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
+
+ void setUpRetryableInternalTransaction() {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
protected:
Session* session() {
@@ -806,10 +888,7 @@ class OpObserverTransactionTest : public OpObserverTxnParticipantTest {
public:
void setUp() override {
OpObserverTxnParticipantTest::setUp();
- txnParticipant().beginOrContinue(opCtx(),
- {*opCtx()->getTxnNumber()},
- false /* autocommit */,
- true /* startTransaction */);
+ OpObserverTxnParticipantTest::setUpNonRetryableTransaction();
}
protected:
@@ -1498,109 +1577,214 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) {
*/
class OpObserverRetryableFindAndModifyTest : public OpObserverTxnParticipantTest {
public:
+ void tearDown() override {
+ OpObserverTxnParticipantTest::tearDown();
+ }
+
+protected:
+ void testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage() {
+ NamespaceString nss = {"test", "coll"};
+ const auto uuid = CollectionUUID::gen();
+
+ CollectionUpdateArgs updateArgs;
+ updateArgs.stmtIds = {0};
+ updateArgs.updatedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ updateArgs.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs.criteria = BSON("_id" << 0);
+ updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage;
+ OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+ update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+
+ WriteUnitOfWork wunit(opCtx());
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ opObserver().onUpdate(opCtx(), update);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "postImage"_sd);
+ }
+
+ void testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage() {
+ NamespaceString nss = {"test", "coll"};
+ const auto uuid = CollectionUUID::gen();
+
+ CollectionUpdateArgs updateArgs;
+ updateArgs.stmtIds = {0};
+ updateArgs.preImageDoc = BSON("_id" << 0 << "data"
+ << "y");
+ updateArgs.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs.criteria = BSON("_id" << 0);
+ updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage;
+ OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+ update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+
+ WriteUnitOfWork wunit(opCtx());
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ opObserver().onUpdate(opCtx(), update);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "preImage"_sd);
+ }
+
+ void testRetryableFindAndModifyDeleteHasNeedsRetryImage() {
+ NamespaceString nss = {"test", "coll"};
+ const auto uuid = CollectionUUID::gen();
+
+ WriteUnitOfWork wunit(opCtx());
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ const auto deletedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc);
+ OplogDeleteEntryArgs args;
+ args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+ args.deletedDoc = &deletedDoc;
+ opObserver().onDelete(opCtx(), nss, uuid, 0, args);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "preImage"_sd);
+ }
+
+ virtual void commit() = 0;
+
+ virtual BSONObj assertGetSingleOplogEntry() = 0;
+};
+
+class OpObserverRetryableFindAndModifyOutsideTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
void setUp() override {
OpObserverTxnParticipantTest::setUp();
- txnParticipant().beginOrContinue(
- opCtx(), {txnNum()}, boost::none /* autocommit */, boost::none /* startTransaction */);
+ OpObserverTxnParticipantTest::setUpRetryableWrite();
}
- void tearDown() override {
- OpObserverTxnParticipantTest::tearDown();
+protected:
+ void commit() final{};
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getSingleOplogEntry(opCtx());
}
};
-TEST_F(OpObserverRetryableFindAndModifyTest,
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
- NamespaceString nss = {"test", "coll"};
- const auto uuid = CollectionUUID::gen();
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
- CollectionUpdateArgs updateArgs;
- updateArgs.stmtIds = {0};
- updateArgs.updatedDoc = BSON("_id" << 0 << "data"
- << "x");
- updateArgs.update = BSON("$set" << BSON("data"
- << "x"));
- updateArgs.criteria = BSON("_id" << 0);
- updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage;
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
- update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
-
- WriteUnitOfWork wunit(opCtx());
- AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
- opObserver().onUpdate(opCtx(), update);
- // Asserts that only a single oplog entry was created. In essence, we did not create any
- // no-op image entries in the oplog.
- const auto oplogEntry = getSingleOplogEntry(opCtx());
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
- ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
- ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
- "postImage"_sd);
-}
-
-TEST_F(OpObserverRetryableFindAndModifyTest,
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
- NamespaceString nss = {"test", "coll"};
- const auto uuid = CollectionUUID::gen();
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
- CollectionUpdateArgs updateArgs;
- updateArgs.stmtIds = {0};
- updateArgs.preImageDoc = BSON("_id" << 0 << "data"
- << "y");
- updateArgs.update = BSON("$set" << BSON("data"
- << "x"));
- updateArgs.criteria = BSON("_id" << 0);
- updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage;
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
- update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
-
- WriteUnitOfWork wunit(opCtx());
- AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
- opObserver().onUpdate(opCtx(), update);
- // Asserts that only a single oplog entry was created. In essence, we did not create any
- // no-op image entries in the oplog.
- const auto oplogEntry = getSingleOplogEntry(opCtx());
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
- ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
- ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
- "preImage"_sd);
-}
-
-TEST_F(OpObserverRetryableFindAndModifyTest, RetryableFindAndModifyDeleteHasNeedsRetryImage) {
- NamespaceString nss = {"test", "coll"};
- const auto uuid = CollectionUUID::gen();
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
- WriteUnitOfWork wunit(opCtx());
- AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
- const auto deletedDoc = BSON("_id" << 0 << "data"
- << "x");
- opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc);
- OplogDeleteEntryArgs args;
- args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
- args.deletedDoc = &deletedDoc;
- opObserver().onDelete(opCtx(), nss, uuid, 0, args);
- // Asserts that only a single oplog entry was created. In essence, we did not create any
- // no-op image entries in the oplog.
- const auto oplogEntry = getSingleOplogEntry(opCtx());
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
- ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
- ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
- "preImage"_sd);
-}
-
-OplogEntry findByTimestamp(const std::vector<BSONObj>& oplogs, Timestamp ts) {
+class OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
+ void setUp() override {
+ OpObserverTxnParticipantTest::setUp();
+ OpObserverTxnParticipantTest::setUpRetryableInternalTransaction();
+ }
+
+protected:
+ void commit() final {
+ commitUnpreparedTransaction<OpObserverImpl>(opCtx(), opObserver());
+ };
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx());
+ }
+};
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
+
+class OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
+ void setUp() override {
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ OpObserverTxnParticipantTest::setUp();
+ OpObserverTxnParticipantTest::setUpRetryableInternalTransaction();
+ }
+
+protected:
+ void commit() final {
+ const auto prepareSlot = repl::getNextOpTime(opCtx());
+ txnParticipant().transitionToPreparedforTest(opCtx(), prepareSlot);
+ auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onTransactionPrepare(
+ opCtx(),
+ {prepareSlot},
+ &txnOps,
+ txnParticipant().getNumberOfPrePostImagesToWriteForTest());
+ };
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx());
+ }
+};
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
+
+boost::optional<OplogEntry> findByTimestamp(const std::vector<BSONObj>& oplogs, Timestamp ts) {
for (auto& oplog : oplogs) {
const auto& entry = assertGet(OplogEntry::parse(oplog));
if (entry.getTimestamp() == ts) {
return entry;
}
}
-
- FAIL("Not found.");
- // C++/clang isn't smart enough to know FAIL is guaranteed to throw.
- MONGO_UNREACHABLE;
+ return boost::none;
}
using StoreDocOption = CollectionUpdateArgs::StoreDocOption;
@@ -1619,6 +1803,8 @@ const bool kChangeStreamImagesDisabled = false;
const auto kNotRetryable = RetryableFindAndModifyLocation::kNone;
const auto kRecordInOplog = RetryableFindAndModifyLocation::kOplog;
const auto kRecordInSideCollection = RetryableFindAndModifyLocation::kSideCollection;
+
+const std::vector<bool> kInMultiDocumentTransactionCases{false, true};
} // namespace
struct UpdateTestCase {
@@ -1662,48 +1848,9 @@ struct UpdateTestCase {
}
};
-TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
- // Create a registry that only registers the Impl. It can be challenging to call methods on the
- // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due
- // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf.
- OpObserverRegistry opObserver;
- opObserver.addObserver(std::make_unique<OpObserverImpl>());
-
- NamespaceString nss("test", "coll");
- CollectionUUID uuid = CollectionUUID::gen();
-
- std::vector<UpdateTestCase> cases = {
- // Regular updates.
- {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
- {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
- {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1},
- {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
- {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
- {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
- // FindAndModify asking for a preImage.
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
- {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
- {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
- // FindAndModify asking for a postImage.
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
- {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
- {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3},
- {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
-
- for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) {
- const auto& testCase = cases[testIdx];
+class OnUpdateOutputsTest : public OpObserverTest {
+protected:
+ void logTestCase(const UpdateTestCase& testCase) {
LOGV2(5739902,
"UpdateTestCase",
"ImageType"_attr = testCase.getImageTypeStr(),
@@ -1712,98 +1859,87 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
"RetryableFindAndModifyLocation"_attr =
testCase.getRetryableFindAndModifyLocationStr(),
"ExpectedOplogEntries"_attr = testCase.numOutputOplogs);
+ }
- CollectionUpdateArgs updateArgs;
- updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
- updateArgs.changeStreamPreAndPostImagesEnabledForCollection =
+ void initializeOplogUpdateEntryArgs(OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ OplogUpdateEntryArgs* update) {
+ update->updateArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
+ update->updateArgs->changeStreamPreAndPostImagesEnabledForCollection =
testCase.changeStreamImagesEnabled;
- auto opCtxRaii = cc().makeOperationContext();
- OperationContext* opCtx = opCtxRaii.get();
- // Phase 1: Clearing any state and setting up fixtures/the update call.
- resetOplogAndTransactions(opCtx);
-
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
- boost::optional<MongoDOperationContextSession> contextSession;
- boost::optional<TransactionParticipant::Participant> txnParticipant;
switch (testCase.retryableOptions) {
case kNotRetryable:
- updateArgs.stmtIds = {kUninitializedStmtId};
+ update->updateArgs->stmtIds = {kUninitializedStmtId};
break;
case kRecordInOplog:
- update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog;
- updateArgs.stmtIds = {1};
+ update->retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog;
+ update->updateArgs->stmtIds = {1};
break;
case kRecordInSideCollection:
- update.retryableFindAndModifyLocation =
+ update->retryableFindAndModifyLocation =
RetryableFindAndModifyLocation::kSideCollection;
- updateArgs.stmtIds = {1};
- if (testCase.alwaysRecordPreImages &&
- testCase.retryableOptions == kRecordInSideCollection) {
+ update->updateArgs->stmtIds = {1};
+ if (testCase.retryableOptions == kRecordInSideCollection) {
// 'getNextOpTimes' requires us to be inside a WUOW when reserving oplog slots.
WriteUnitOfWork wuow(opCtx);
auto reservedSlots = repl::getNextOpTimes(opCtx, 3);
- updateArgs.oplogSlots = reservedSlots;
+ update->updateArgs->oplogSlots = reservedSlots;
}
break;
}
- if (testCase.retryableOptions != kNotRetryable) {
- opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx->setTxnNumber(TxnNumber(testIdx));
- contextSession.emplace(opCtx);
- txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx,
- {TxnNumber(testIdx)},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
- }
-
- updateArgs.preImageDoc = boost::none;
+ update->updateArgs->preImageDoc = boost::none;
if (testCase.imageType == StoreDocOption::PreImage || testCase.alwaysRecordPreImages ||
testCase.changeStreamImagesEnabled) {
- updateArgs.preImageDoc = BSON("_id" << 0 << "preImage" << true);
+ update->updateArgs->preImageDoc = BSON("_id" << 0 << "preImage" << true);
}
-
- updateArgs.updatedDoc = BSON("_id" << 0 << "postImage" << true);
- updateArgs.update =
+ update->updateArgs->updatedDoc = BSON("_id" << 0 << "postImage" << true);
+ update->updateArgs->update =
BSON("$set" << BSON("postImage" << true) << "$unset" << BSON("preImage" << 1));
- updateArgs.criteria = BSON("_id" << 0);
- updateArgs.storeDocOption = testCase.imageType;
-
- // Phase 2: Call the code we're testing.
- WriteUnitOfWork wuow(opCtx);
- AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX);
- opObserver.onUpdate(opCtx, update);
- wuow.commit();
-
- // Phase 3: Analyze the results:
-
- // This `getNOplogEntries` also asserts that all oplogs are retrieved.
- std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
- // Entries are returned in ascending timestamp order.
- const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back()));
+ update->updateArgs->criteria = BSON("_id" << 0);
+ update->updateArgs->storeDocOption = testCase.imageType;
+ }
+ void checkPreImageInOplogIfNeeded(const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry) {
const bool checkPreImageInOplog = testCase.alwaysRecordPreImages ||
(testCase.imageType == StoreDocOption::PreImage &&
testCase.retryableOptions == kRecordInOplog);
if (checkPreImageInOplog) {
- ASSERT(actualOp.getPreImageOpTime());
- const Timestamp preImageOpTime = actualOp.getPreImageOpTime()->getTimestamp();
+ ASSERT(updateOplogEntry.getPreImageOpTime());
+ const Timestamp preImageOpTime = updateOplogEntry.getPreImageOpTime()->getTimestamp();
ASSERT_FALSE(preImageOpTime.isNull());
- OplogEntry preImage = findByTimestamp(oplogs, preImageOpTime);
+ OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime);
ASSERT_BSONOBJ_EQ(update.updateArgs->preImageDoc.get(), preImage.getObject());
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getPreImageOpTime());
}
+ }
+ void checkPostImageInOplogIfNeeded(const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry) {
const bool checkPostImageInOplog = testCase.imageType == StoreDocOption::PostImage &&
testCase.retryableOptions == kRecordInOplog;
if (checkPostImageInOplog) {
- ASSERT(actualOp.getPostImageOpTime());
- const Timestamp postImageOpTime = actualOp.getPostImageOpTime()->getTimestamp();
+ ASSERT(updateOplogEntry.getPostImageOpTime());
+ const Timestamp postImageOpTime = updateOplogEntry.getPostImageOpTime()->getTimestamp();
ASSERT_FALSE(postImageOpTime.isNull());
- OplogEntry postImage = findByTimestamp(oplogs, postImageOpTime);
+ OplogEntry postImage = *findByTimestamp(oplogs, postImageOpTime);
ASSERT_BSONOBJ_EQ(update.updateArgs->updatedDoc, postImage.getObject());
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getPostImageOpTime());
}
+ }
+ void checkSideCollectionIfNeeded(OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry) {
bool checkSideCollection =
testCase.isFindAndModify() && testCase.retryableOptions == kRecordInSideCollection;
if (checkSideCollection && testCase.alwaysRecordPreImages &&
@@ -1813,32 +1949,180 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
// in the side collection.
checkSideCollection = false;
}
-
if (checkSideCollection) {
repl::ImageEntry imageEntry =
- getImageEntryFromSideCollection(opCtx, *actualOp.getSessionId());
+ getImageEntryFromSideCollection(opCtx, *updateOplogEntry.getSessionId());
const BSONObj& expectedImage = testCase.imageType == StoreDocOption::PreImage
? update.updateArgs->preImageDoc.get()
: update.updateArgs->updatedDoc;
ASSERT_BSONOBJ_EQ(expectedImage, imageEntry.getImage());
+ ASSERT(imageEntry.getImageKind() == updateOplogEntry.getNeedsRetryImage());
if (testCase.imageType == StoreDocOption::PreImage) {
ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage);
} else {
ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPostImage);
}
+
+ // If 'updateOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged
+ // noop oplog entry for the pre/postImage written to the side collection.
+ const Timestamp forgeNoopTimestamp = updateOplogEntry.getTimestamp() - 1;
+ ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp));
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getNeedsRetryImage());
+ if (updateOplogEntry.getSessionId()) {
+ ASSERT_FALSE(
+ didWriteImageEntryToSideCollection(opCtx, *updateOplogEntry.getSessionId()));
+ } else {
+ // Session id is missing only for non-retryable option.
+ ASSERT(testCase.retryableOptions == kNotRetryable);
+ }
}
+ }
+ void checkChangeStreamImagesIfNeeded(OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const OplogEntry& updateOplogEntry) {
if (testCase.changeStreamImagesEnabled) {
BSONObj container;
- ChangeStreamPreImageId preImageId(uuid, actualOp.getOpTime().getTimestamp(), 0);
+ ChangeStreamPreImageId preImageId(
+ _uuid, updateOplogEntry.getOpTime().getTimestamp(), 0);
ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container);
const BSONObj& expectedImage = update.updateArgs->preImageDoc.get();
ASSERT_BSONOBJ_EQ(expectedImage, preImage.getPreImage());
- ASSERT_EQ(actualOp.getWallClockTime(), preImage.getOperationTime());
+ ASSERT_EQ(updateOplogEntry.getWallClockTime(), preImage.getOperationTime());
+ }
+ }
+
+ std::vector<UpdateTestCase> _cases = {
+ // Regular updates.
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
+ // FindAndModify asking for a preImage.
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
+ // FindAndModify asking for a postImage.
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
+
+ const NamespaceString _nss{"test", "coll"};
+ const CollectionUUID _uuid = CollectionUUID::gen();
+};
+
+TEST_F(OnUpdateOutputsTest, TestNonTransactionFundamentalOnUpdateOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the update call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
}
+
+ // Phase 2: Call the code we're testing.
+ CollectionUpdateArgs updateArgs;
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
+ initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ opObserver.onUpdate(opCtx, updateEntryArgs);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto updateOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkChangeStreamImagesIfNeeded(opCtx, testCase, updateEntryArgs, updateOplogEntry);
}
}
+TEST_F(OnUpdateOutputsTest, TestFundamentalTransactionOnUpdateOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ if (testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) {
+ continue;
+ }
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the update call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ } else {
+ beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ CollectionUpdateArgs updateArgs;
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
+ initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ opObserver.onUpdate(opCtx, updateEntryArgs);
+ commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ auto updateOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry);
+ checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ }
+}
struct InsertTestCase {
bool isRetryableWrite;
@@ -1882,17 +2166,9 @@ TEST_F(OpObserverTest, TestFundamentalOnInsertsOutputs) {
toInsert.emplace_back(stmtId, BSON("_id" << stmtIdx));
}
- boost::optional<MongoDOperationContextSession> contextSession;
- boost::optional<TransactionParticipant::Participant> txnParticipant;
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
if (testCase.isRetryableWrite) {
- opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx->setTxnNumber(TxnNumber(testIdx));
- contextSession.emplace(opCtx);
- txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx,
- {TxnNumber(testIdx)},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
}
// Phase 2: Call the code we're testing.
@@ -1971,31 +2247,10 @@ struct DeleteTestCase {
}
};
-TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) {
- // Create a registry that only registers the Impl. It can be challenging to call methods on the
- // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due
- // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf.
- OpObserverRegistry opObserver;
- opObserver.addObserver(std::make_unique<OpObserverImpl>());
-
- NamespaceString nss("test", "coll");
- CollectionUUID uuid = CollectionUUID::gen();
-
- // For the DeleteTestCase, we add a "pre-image" deletedDoc when using `kRecordInOplog` and
- // `kRecordInSideCollection`.
- std::vector<DeleteTestCase> cases{
- {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
- {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
- {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
- {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
- {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
- {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
- {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
+class OnDeleteOutputsTest : public OpObserverTest {
- for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) {
- const auto& testCase = cases[testIdx];
+protected:
+ void logTestCase(const DeleteTestCase& testCase) {
LOGV2(5739905,
"DeleteTestCase",
"PreImageRecording"_attr = testCase.alwaysRecordPreImages,
@@ -2003,111 +2258,214 @@ TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) {
"RetryableFindAndModifyLocation"_attr =
testCase.getRetryableFindAndModifyLocationStr(),
"ExpectedOplogEntries"_attr = testCase.numOutputOplogs);
+ }
- OplogDeleteEntryArgs deleteArgs;
- deleteArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
- deleteArgs.changeStreamPreAndPostImagesEnabledForCollection =
+ void initializeOplogDeleteEntryArgs(OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ OplogDeleteEntryArgs* deleteArgs) {
+ deleteArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
+ deleteArgs->changeStreamPreAndPostImagesEnabledForCollection =
testCase.changeStreamImagesEnabled;
- auto opCtxRaii = cc().makeOperationContext();
- OperationContext* opCtx = opCtxRaii.get();
- // Phase 1: Clearing any state and setting up fixtures/the update call.
- resetOplogAndTransactions(opCtx);
-
- boost::optional<MongoDOperationContextSession> contextSession;
- boost::optional<TransactionParticipant::Participant> txnParticipant;
- if (testCase.retryableOptions != kNotRetryable) {
- opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx->setTxnNumber(TxnNumber(testIdx));
- contextSession.emplace(opCtx);
- txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx,
- {TxnNumber(testIdx)},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
- }
switch (testCase.retryableOptions) {
case kNotRetryable:
- deleteArgs.retryableFindAndModifyLocation = kNotRetryable;
+ deleteArgs->retryableFindAndModifyLocation = kNotRetryable;
break;
case kRecordInOplog:
- deleteArgs.retryableFindAndModifyLocation = kRecordInOplog;
+ deleteArgs->retryableFindAndModifyLocation = kRecordInOplog;
break;
case kRecordInSideCollection:
- deleteArgs.retryableFindAndModifyLocation = kRecordInSideCollection;
+ deleteArgs->retryableFindAndModifyLocation = kRecordInSideCollection;
break;
}
-
- const BSONObj deletedDoc = BSON("_id" << 0 << "valuePriorToDelete"
- << "marvelous");
if (testCase.isRetryable() || testCase.alwaysRecordPreImages ||
testCase.changeStreamImagesEnabled) {
- deleteArgs.deletedDoc = &deletedDoc;
+ deleteArgs->deletedDoc = &_deletedDoc;
}
- // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
- // of setting of `documentKey` on the delete for sharding purposes.
- // `OpObserverImpl::onDelete` asserts its existence.
- documentKeyDecoration(opCtx).emplace(deletedDoc["_id"].wrap(), boost::none);
- StmtId deleteStmtId = kUninitializedStmtId;
- if (testCase.isRetryable()) {
- deleteStmtId = {1};
- }
-
- // Phase 2: Call the code we're testing.
- WriteUnitOfWork wuow(opCtx);
- AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX);
- opObserver.onDelete(opCtx, nss, uuid, deleteStmtId, deleteArgs);
- wuow.commit();
-
- // Phase 3: Analyze the results:
-
- // This `getNOplogEntries` also asserts that all oplogs are retrieved.
- std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
- // Entries are returned in ascending timestamp order.
- const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back()));
+ }
+ void checkPreImageInOplogIfNeeded(const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& deleteOplogEntry) {
const bool checkPreImageInOplog = deleteArgs.preImageRecordingEnabledForCollection ||
deleteArgs.retryableFindAndModifyLocation == kRecordInOplog;
if (checkPreImageInOplog) {
- ASSERT(actualOp.getPreImageOpTime());
- const Timestamp preImageOpTime = actualOp.getPreImageOpTime()->getTimestamp();
+ ASSERT(deleteOplogEntry.getPreImageOpTime());
+ const Timestamp preImageOpTime = deleteOplogEntry.getPreImageOpTime()->getTimestamp();
ASSERT_FALSE(preImageOpTime.isNull());
- OplogEntry preImage = findByTimestamp(oplogs, preImageOpTime);
- ASSERT_BSONOBJ_EQ(deletedDoc, preImage.getObject());
+ OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime);
+ ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getObject());
} else {
- ASSERT_FALSE(actualOp.getPreImageOpTime());
+ ASSERT_FALSE(deleteOplogEntry.getPreImageOpTime());
}
+ }
+ void checkSideCollectionIfNeeded(OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& deleteOplogEntry) {
bool didWriteInSideCollection =
deleteArgs.retryableFindAndModifyLocation == kRecordInSideCollection &&
!deleteArgs.preImageRecordingEnabledForCollection;
if (didWriteInSideCollection) {
repl::ImageEntry imageEntry =
- getImageEntryFromSideCollection(opCtx, *actualOp.getSessionId());
+ getImageEntryFromSideCollection(opCtx, *deleteOplogEntry.getSessionId());
+ ASSERT(imageEntry.getImageKind() == deleteOplogEntry.getNeedsRetryImage());
ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage);
- ASSERT_BSONOBJ_EQ(deletedDoc, imageEntry.getImage());
+ ASSERT_BSONOBJ_EQ(_deletedDoc, imageEntry.getImage());
+
+ // If 'deleteOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged
+ // noop oplog entry for the preImage written to the side collection.
+ const Timestamp forgeNoopTimestamp = deleteOplogEntry.getTimestamp() - 1;
+ ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp));
} else {
- if (actualOp.getSessionId()) {
- ASSERT_FALSE(didWriteImageEntryToSideCollection(opCtx, *actualOp.getSessionId()));
+ ASSERT_FALSE(deleteOplogEntry.getNeedsRetryImage());
+ if (deleteOplogEntry.getSessionId()) {
+ ASSERT_FALSE(
+ didWriteImageEntryToSideCollection(opCtx, *deleteOplogEntry.getSessionId()));
} else {
// Session id is missing only for non-retryable option.
ASSERT(testCase.retryableOptions == kNotRetryable);
}
}
+ }
- const Timestamp preImageOpTime = actualOp.getOpTime().getTimestamp();
- ChangeStreamPreImageId preImageId(uuid, preImageOpTime, 0);
+ void checkChangeStreamImagesIfNeeded(OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const OplogEntry& deleteOplogEntry) {
+ const Timestamp preImageOpTime = deleteOplogEntry.getOpTime().getTimestamp();
+ ChangeStreamPreImageId preImageId(_uuid, preImageOpTime, 0);
if (deleteArgs.changeStreamPreAndPostImagesEnabledForCollection) {
BSONObj container;
ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container);
- ASSERT_BSONOBJ_EQ(deletedDoc, preImage.getPreImage());
- ASSERT_EQ(actualOp.getWallClockTime(), preImage.getOperationTime());
+ ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getPreImage());
+ ASSERT_EQ(deleteOplogEntry.getWallClockTime(), preImage.getOperationTime());
} else {
ASSERT_FALSE(didWriteDeletedDocToPreImagesCollection(opCtx, preImageId));
}
}
+
+ std::vector<DeleteTestCase> _cases{
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
+
+ const NamespaceString _nss{"test", "coll"};
+ const CollectionUUID _uuid = CollectionUUID::gen();
+ const BSONObj _deletedDoc = BSON("_id" << 0 << "valuePriorToDelete"
+ << "marvelous");
+};
+
+TEST_F(OnDeleteOutputsTest, TestNonTransactionFundamentalOnDeleteOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the delete call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ OplogDeleteEntryArgs deleteEntryArgs;
+ initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none);
+ opObserver.onDelete(
+ opCtx, _nss, _uuid, testCase.isRetryable() ? 1 : kUninitializedStmtId, deleteEntryArgs);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto deleteOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ checkChangeStreamImagesIfNeeded(opCtx, testCase, deleteEntryArgs, deleteOplogEntry);
+ }
}
+TEST_F(OnDeleteOutputsTest, TestTransactionFundamentalOnDeleteOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ if (testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) {
+ continue;
+ }
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the delete call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ } else {
+ beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ OplogDeleteEntryArgs deleteEntryArgs;
+ initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs);
+ const auto stmtId = testCase.isRetryable() ? 1 : kUninitializedStmtId;
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none);
+ opObserver.onDelete(opCtx, _nss, _uuid, stmtId, deleteEntryArgs);
+ commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ auto deleteOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry);
+ checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ }
+}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
const NamespaceString nss1("testDB", "testColl");