summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer/op_observer_impl_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer/op_observer_impl_test.cpp')
-rw-r--r--src/mongo/db/op_observer/op_observer_impl_test.cpp4628
1 files changed, 4628 insertions, 0 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp
new file mode 100644
index 00000000000..f9bf11fd5a5
--- /dev/null
+++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp
@@ -0,0 +1,4628 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/batched_write_context.h"
+#include "mongo/db/catalog/import_collection_oplog_entry_gen.h"
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/concurrency/locker_noop.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/keys_collection_client_sharded.h"
+#include "mongo/db/keys_collection_manager.h"
+#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/multitenancy_gen.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/op_observer/op_observer_impl.h"
+#include "mongo/db/op_observer/op_observer_registry.h"
+#include "mongo/db/op_observer/op_observer_util.h"
+#include "mongo/db/pipeline/change_stream_preimage_gen.h"
+#include "mongo/db/read_write_concern_defaults.h"
+#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h"
+#include "mongo/db/repl/apply_ops.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/oplog_interface_local.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/tenant_migration_access_blocker_registry.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/db/transaction_participant_gen.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/logv2/log.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/util/clock_source_mock.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+
+namespace mongo {
+namespace {
+
+using repl::OplogEntry;
+using unittest::assertGet;
+
+namespace {
+
+OplogEntry getInnerEntryFromApplyOpsOplogEntry(const OplogEntry& oplogEntry) {
+ std::vector<repl::OplogEntry> innerEntries;
+ ASSERT(oplogEntry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ repl::ApplyOps::extractOperationsTo(oplogEntry, oplogEntry.getEntry().toBSON(), &innerEntries);
+ ASSERT_EQ(innerEntries.size(), 1u);
+ return innerEntries[0];
+}
+
+void beginRetryableWriteWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ opCtx->setTxnNumber(txnNumber);
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx,
+ {*opCtx->getTxnNumber()},
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+};
+
+void beginNonRetryableTransactionWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */);
+};
+
+void beginRetryableInternalTransactionWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ opCtx->setLogicalSessionId(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */);
+};
+
+template <typename OpObserverType>
+void commitUnpreparedTransaction(OperationContext* opCtx, OpObserverType& opObserver) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx);
+ opObserver.onUnpreparedTransactionCommit(
+ opCtx, &txnOps, txnParticipant.getNumberOfPrePostImagesToWriteForTest());
+}
+
+std::vector<repl::OpTime> reserveOpTimesInSideTransaction(OperationContext* opCtx, size_t count) {
+ TransactionParticipant::SideTransactionBlock sideTxn{opCtx};
+
+ WriteUnitOfWork wuow{opCtx};
+ auto reservedSlots = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, count);
+ wuow.release();
+
+ opCtx->recoveryUnit()->abortUnitOfWork();
+ opCtx->lockState()->endWriteUnitOfWork();
+
+ return reservedSlots;
+}
+
+repl::OpTime reserveOpTimeInSideTransaction(OperationContext* opCtx) {
+ return reserveOpTimesInSideTransaction(opCtx, 1)[0];
+}
+
+} // namespace
+
+class OpObserverTest : public ServiceContextMongoDTest {
+public:
+ void setUp() override {
+ // Set up mongod.
+ ServiceContextMongoDTest::setUp();
+
+ auto service = getServiceContext();
+ auto opCtx = cc().makeOperationContext();
+ // onStepUp() relies on the storage interface to create the config.transactions table.
+ repl::StorageInterface::set(service, std::make_unique<repl::StorageInterfaceImpl>());
+
+ // Set up ReplicationCoordinator and create oplog.
+ repl::ReplicationCoordinator::set(
+ service,
+ std::make_unique<repl::ReplicationCoordinatorMock>(service, createReplSettings()));
+ repl::createOplog(opCtx.get());
+
+ // Ensure that we are primary.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx.get());
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ MongoDSessionCatalog::onStepUp(opCtx.get());
+
+ ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
+ }
+
+ void tearDown() override {
+ serverGlobalParams.clusterRole = ClusterRole::None;
+ }
+
+ void reset(OperationContext* opCtx,
+ NamespaceString nss,
+ boost::optional<UUID> uuid = boost::none) const {
+ writeConflictRetry(opCtx, "deleteAll", nss.ns(), [&] {
+ opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ opCtx->recoveryUnit()->abandonSnapshot();
+
+ WriteUnitOfWork wunit(opCtx);
+ AutoGetCollection collRaii(opCtx, nss, LockMode::MODE_X);
+ if (collRaii) {
+ invariant(collRaii.getWritableCollection(opCtx)->truncate(opCtx).isOK());
+ } else {
+ auto db = collRaii.ensureDbExists(opCtx);
+ CollectionOptions opts;
+ if (uuid) {
+ opts.uuid = uuid;
+ }
+ invariant(db->createCollection(opCtx, nss, opts));
+ }
+ wunit.commit();
+ });
+ }
+
+ void resetOplogAndTransactions(OperationContext* opCtx) const {
+ reset(opCtx, NamespaceString::kRsOplogNamespace);
+ reset(opCtx, NamespaceString::kSessionTransactionsTableNamespace);
+ reset(opCtx, NamespaceString::kConfigImagesNamespace);
+ reset(opCtx, NamespaceString::kChangeStreamPreImagesNamespace);
+ }
+
+protected:
+ explicit OpObserverTest(Options options = {}) : ServiceContextMongoDTest(std::move(options)) {}
+
+ // Assert that the oplog has the expected number of entries, and return them
+ std::vector<BSONObj> getNOplogEntries(OperationContext* opCtx, int numExpected) {
+ std::vector<BSONObj> allOplogEntries;
+ repl::OplogInterfaceLocal oplogInterface(opCtx);
+
+ AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ auto oplogIter = oplogInterface.makeIterator();
+ while (true) {
+ StatusWith<std::pair<BSONObj, RecordId>> swEntry = oplogIter->next();
+ if (swEntry.getStatus() == ErrorCodes::CollectionIsEmpty) {
+ break;
+ }
+ allOplogEntries.push_back(swEntry.getValue().first);
+ }
+ if (allOplogEntries.size() != static_cast<std::size_t>(numExpected)) {
+ LOGV2(5739903,
+ "Incorrect number of oplog entries made",
+ "numExpected"_attr = numExpected,
+ "numFound"_attr = allOplogEntries.size(),
+ "entries"_attr = allOplogEntries);
+ }
+ ASSERT_EQUALS(allOplogEntries.size(), numExpected);
+
+ std::vector<BSONObj> ret(numExpected);
+ for (int idx = numExpected - 1; idx >= 0; idx--) {
+ // The oplogIterator returns the entries in reverse order.
+ ret[idx] = allOplogEntries[numExpected - idx - 1];
+ }
+
+ // Some unittests reuse the same OperationContext to read the oplog and end up acquiring the
+ // RSTL lock after using the OplogInterfaceLocal. This is a hack to make sure we do not hold
+ // RSTL lock for prepared transactions.
+ if (opCtx->inMultiDocumentTransaction() &&
+ TransactionParticipant::get(opCtx).transactionIsPrepared()) {
+ opCtx->lockState()->unlockRSTLforPrepare();
+ }
+ return ret;
+ }
+
+ // Assert that oplog only has a single entry and return that oplog entry.
+ BSONObj getSingleOplogEntry(OperationContext* opCtx) {
+ return getNOplogEntries(opCtx, 1).back();
+ }
+
+ BSONObj getInnerEntryFromSingleApplyOpsOplogEntry(OperationContext* opCtx) {
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(getNOplogEntries(opCtx, 1).back()));
+ return getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry).getEntry().toBSON();
+ }
+
+ bool didWriteImageEntryToSideCollection(OperationContext* opCtx,
+ const LogicalSessionId& sessionId) {
+ AutoGetCollection sideCollection(
+ opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IS);
+ const auto imageEntry = Helpers::findOneForTesting(
+ opCtx, sideCollection.getCollection(), BSON("_id" << sessionId.toBSON()), false);
+ return !imageEntry.isEmpty();
+ }
+
+ bool didWriteDeletedDocToPreImagesCollection(OperationContext* opCtx,
+ const ChangeStreamPreImageId preImageId) {
+ AutoGetCollection preImagesCollection(
+ opCtx, NamespaceString::kChangeStreamPreImagesNamespace, LockMode::MODE_IS);
+ const auto preImage = Helpers::findOneForTesting(
+ opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON()), false);
+ return !preImage.isEmpty();
+ }
+
+ repl::ImageEntry getImageEntryFromSideCollection(OperationContext* opCtx,
+ const LogicalSessionId& sessionId) {
+ AutoGetCollection sideCollection(
+ opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IS);
+ return repl::ImageEntry::parse(
+ IDLParserErrorContext("image entry"),
+ Helpers::findOneForTesting(
+ opCtx, sideCollection.getCollection(), BSON("_id" << sessionId.toBSON())));
+ }
+
+ SessionTxnRecord getTxnRecord(OperationContext* opCtx, const LogicalSessionId& sessionId) {
+ AutoGetCollection configTransactions(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace, LockMode::MODE_IS);
+
+ return SessionTxnRecord::parse(
+ IDLParserErrorContext("txn record"),
+ Helpers::findOneForTesting(
+ opCtx, configTransactions.getCollection(), BSON("_id" << sessionId.toBSON())));
+ }
+
+ /**
+ * The caller must pass a BSONObj container to own the preImage BSONObj part of a
+ * `ChangeStreamPreImage`. The `ChangeStreamPreImage` idl declares the preImage BSONObj to not
+ * be owned. Thus the BSONObject returned from the collection must outlive any accesses to
+ * `ChangeStreamPreImage.getPreImage`.
+ */
+ ChangeStreamPreImage getChangeStreamPreImage(OperationContext* opCtx,
+ const ChangeStreamPreImageId& preImageId,
+ BSONObj* container) {
+ AutoGetCollection preImagesCollection(
+ opCtx, NamespaceString::kChangeStreamPreImagesNamespace, LockMode::MODE_IS);
+ *container = Helpers::findOneForTesting(opCtx,
+ preImagesCollection.getCollection(),
+ BSON("_id" << preImageId.toBSON()))
+ .getOwned();
+ return ChangeStreamPreImage::parse(IDLParserErrorContext("pre-image"), *container);
+ }
+
+ ReadWriteConcernDefaultsLookupMock _lookupMock;
+
+private:
+ // Creates a reasonable set of ReplSettings for most tests. We need to be able to
+ // override this to create a larger oplog.
+ virtual repl::ReplSettings createReplSettings() {
+ repl::ReplSettings settings;
+ settings.setOplogSizeBytes(5 * 1024 * 1024);
+ settings.setReplSetString("mySet/node1:12345");
+ return settings;
+ }
+};
+
+TEST_F(OpObserverTest, StartIndexBuildExpectedOplogEntry) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = UUID::gen();
+ NamespaceString nss(boost::none, "test.coll");
+ UUID indexBuildUUID = UUID::gen();
+
+ BSONObj specX = BSON("key" << BSON("x" << 1) << "name"
+ << "x_1"
+ << "v" << 2);
+ BSONObj specA = BSON("key" << BSON("a" << 1) << "name"
+ << "a_1"
+ << "v" << 2);
+ std::vector<BSONObj> specs = {specX, specA};
+
+ // Write to the oplog.
+ {
+ AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.onStartIndexBuild(
+ opCtx.get(), nss, uuid, indexBuildUUID, specs, false /*fromMigrate*/);
+ wunit.commit();
+ }
+
+ // Create expected startIndexBuild command.
+ BSONObjBuilder startIndexBuildBuilder;
+ startIndexBuildBuilder.append("startIndexBuild", nss.coll());
+ indexBuildUUID.appendToBuilder(&startIndexBuildBuilder, "indexBuildUUID");
+ BSONArrayBuilder indexesArr(startIndexBuildBuilder.subarrayStart("indexes"));
+ indexesArr.append(specX);
+ indexesArr.append(specA);
+ indexesArr.done();
+ BSONObj startIndexBuildCmd = startIndexBuildBuilder.done();
+
+ // Ensure the startIndexBuild fields were correctly set.
+ auto oplogEntry = getSingleOplogEntry(opCtx.get());
+ auto o = oplogEntry.getObjectField("o");
+ ASSERT_BSONOBJ_EQ(startIndexBuildCmd, o);
+}
+
+TEST_F(OpObserverTest, CommitIndexBuildExpectedOplogEntry) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = UUID::gen();
+ NamespaceString nss(boost::none, "test.coll");
+ UUID indexBuildUUID = UUID::gen();
+
+ BSONObj specX = BSON("key" << BSON("x" << 1) << "name"
+ << "x_1"
+ << "v" << 2);
+ BSONObj specA = BSON("key" << BSON("a" << 1) << "name"
+ << "a_1"
+ << "v" << 2);
+ std::vector<BSONObj> specs = {specX, specA};
+
+ // Write to the oplog.
+ {
+ AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.onCommitIndexBuild(
+ opCtx.get(), nss, uuid, indexBuildUUID, specs, false /*fromMigrate*/);
+ wunit.commit();
+ }
+
+ // Create expected commitIndexBuild command.
+ BSONObjBuilder commitIndexBuildBuilder;
+ commitIndexBuildBuilder.append("commitIndexBuild", nss.coll());
+ indexBuildUUID.appendToBuilder(&commitIndexBuildBuilder, "indexBuildUUID");
+ BSONArrayBuilder indexesArr(commitIndexBuildBuilder.subarrayStart("indexes"));
+ indexesArr.append(specX);
+ indexesArr.append(specA);
+ indexesArr.done();
+ BSONObj commitIndexBuildCmd = commitIndexBuildBuilder.done();
+
+ // Ensure the commitIndexBuild fields were correctly set.
+ auto oplogEntry = getSingleOplogEntry(opCtx.get());
+ auto o = oplogEntry.getObjectField("o");
+ ASSERT_BSONOBJ_EQ(commitIndexBuildCmd, o);
+}
+
+TEST_F(OpObserverTest, AbortIndexBuildExpectedOplogEntry) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = UUID::gen();
+ NamespaceString nss(boost::none, "test.coll");
+ UUID indexBuildUUID = UUID::gen();
+
+ BSONObj specX = BSON("key" << BSON("x" << 1) << "name"
+ << "x_1"
+ << "v" << 2);
+ BSONObj specA = BSON("key" << BSON("a" << 1) << "name"
+ << "a_1"
+ << "v" << 2);
+ std::vector<BSONObj> specs = {specX, specA};
+
+ // Write to the oplog.
+ Status cause(ErrorCodes::OperationFailed, "index build failed");
+ {
+ AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ auto fromMigrate = false;
+ opObserver.onAbortIndexBuild(
+ opCtx.get(), nss, uuid, indexBuildUUID, specs, cause, fromMigrate);
+ wunit.commit();
+ }
+
+ // Create expected abortIndexBuild command.
+ BSONObjBuilder abortIndexBuildBuilder;
+ abortIndexBuildBuilder.append("abortIndexBuild", nss.coll());
+ indexBuildUUID.appendToBuilder(&abortIndexBuildBuilder, "indexBuildUUID");
+ BSONArrayBuilder indexesArr(abortIndexBuildBuilder.subarrayStart("indexes"));
+ indexesArr.append(specX);
+ indexesArr.append(specA);
+ indexesArr.done();
+ BSONObjBuilder causeBuilder(abortIndexBuildBuilder.subobjStart("cause"));
+ causeBuilder.appendBool("ok", 0);
+ cause.serializeErrorToBSON(&causeBuilder);
+ causeBuilder.done();
+ BSONObj abortIndexBuildCmd = abortIndexBuildBuilder.done();
+
+ // Ensure the abortIndexBuild fields were correctly set.
+ auto oplogEntry = getSingleOplogEntry(opCtx.get());
+ auto o = oplogEntry.getObjectField("o");
+ ASSERT_BSONOBJ_EQ(abortIndexBuildCmd, o);
+
+ // Should be able to extract a Status from the 'cause' field.
+ ASSERT_EQUALS(cause, getStatusFromCommandResult(o.getObjectField("cause")));
+}
+
+TEST_F(OpObserverTest, CollModWithCollectionOptionsAndTTLInfo) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = UUID::gen();
+
+ // Create 'collMod' command.
+ NamespaceString nss(boost::none, "test.coll");
+ BSONObj collModCmd = BSON("collMod" << nss.coll() << "validationLevel"
+ << "off"
+ << "validationAction"
+ << "warn"
+ // We verify that 'onCollMod' ignores this field.
+ << "index"
+ << "indexData");
+
+ CollectionOptions oldCollOpts;
+ oldCollOpts.validationLevel = ValidationLevelEnum::strict;
+ oldCollOpts.validationAction = ValidationActionEnum::error;
+
+ IndexCollModInfo indexInfo;
+ indexInfo.expireAfterSeconds = Seconds(10);
+ indexInfo.oldExpireAfterSeconds = Seconds(5);
+ indexInfo.indexName = "name_of_index";
+
+ // Write to the oplog.
+ {
+ AutoGetCollection autoColl(opCtx.get(), nss, MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.onCollMod(opCtx.get(), nss, uuid, collModCmd, oldCollOpts, indexInfo);
+ wunit.commit();
+ }
+
+ auto oplogEntry = getSingleOplogEntry(opCtx.get());
+
+ // Ensure that collMod fields were properly added to the oplog entry.
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected = BSON(
+ "collMod" << nss.coll() << "validationLevel"
+ << "off"
+ << "validationAction"
+ << "warn"
+ << "index"
+ << BSON("name" << indexInfo.indexName << "expireAfterSeconds"
+ << durationCount<Seconds>(indexInfo.expireAfterSeconds.get())));
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ // Ensure that the old collection metadata was saved.
+ auto o2 = oplogEntry.getObjectField("o2");
+ auto o2Expected = BSON("collectionOptions_old"
+ << BSON("validationLevel"
+ << ValidationLevel_serializer(*oldCollOpts.validationLevel)
+ << "validationAction"
+ << ValidationAction_serializer(*oldCollOpts.validationAction))
+ << "indexOptions_old"
+ << BSON("expireAfterSeconds" << durationCount<Seconds>(
+ indexInfo.oldExpireAfterSeconds.get())));
+
+ ASSERT_BSONOBJ_EQ(o2Expected, o2);
+}
+
+TEST_F(OpObserverTest, CollModWithOnlyCollectionOptions) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = UUID::gen();
+
+ // Create 'collMod' command.
+ NamespaceString nss(boost::none, "test.coll");
+ BSONObj collModCmd = BSON("collMod" << nss.coll() << "validationLevel"
+ << "off"
+ << "validationAction"
+ << "warn");
+
+ CollectionOptions oldCollOpts;
+ oldCollOpts.validationLevel = ValidationLevelEnum::strict;
+ oldCollOpts.validationAction = ValidationActionEnum::error;
+
+ // Write to the oplog.
+ {
+ AutoGetCollection autoColl(opCtx.get(), nss, MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.onCollMod(opCtx.get(), nss, uuid, collModCmd, oldCollOpts, boost::none);
+ wunit.commit();
+ }
+
+ auto oplogEntry = getSingleOplogEntry(opCtx.get());
+
+ // Ensure that collMod fields were properly added to oplog entry.
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected = collModCmd;
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ // Ensure that the old collection metadata was saved and that TTL info is not present.
+ auto o2 = oplogEntry.getObjectField("o2");
+ auto o2Expected = BSON("collectionOptions_old"
+ << BSON("validationLevel"
+ << ValidationLevel_serializer(*oldCollOpts.validationLevel)
+ << "validationAction"
+ << ValidationAction_serializer(*oldCollOpts.validationAction)));
+
+ ASSERT_BSONOBJ_EQ(o2Expected, o2);
+}
+
+TEST_F(OpObserverTest, OnDropCollectionReturnsDropOpTime) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ auto uuid = UUID::gen();
+
+ // Create 'drop' command.
+ NamespaceString nss(boost::none, "test.coll");
+ auto dropCmd = BSON("drop" << nss.coll());
+
+ // Write to the oplog.
+ repl::OpTime dropOpTime;
+ {
+ AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.onDropCollection(
+ opCtx.get(), nss, uuid, 0U, OpObserver::CollectionDropType::kTwoPhase);
+ dropOpTime = OpObserver::Times::get(opCtx.get()).reservedOpTimes.front();
+ wunit.commit();
+ }
+
+ auto oplogEntry = getSingleOplogEntry(opCtx.get());
+
+ // Ensure that drop fields were properly added to oplog entry.
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected = dropCmd;
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ // Ensure that the drop optime returned is the same as the last optime in the ReplClientInfo.
+ ASSERT_EQUALS(repl::ReplClientInfo::forClient(&cc()).getLastOp(), dropOpTime);
+}
+
+TEST_F(OpObserverTest, OnRenameCollectionReturnsRenameOpTime) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+
+ auto uuid = UUID::gen();
+ auto dropTargetUuid = UUID::gen();
+ auto stayTemp = false;
+ NamespaceString sourceNss(boost::none, "test.foo");
+ NamespaceString targetNss(boost::none, "test.bar");
+
+ // Write to the oplog.
+ repl::OpTime renameOpTime;
+ {
+ AutoGetDb autoDb(opCtx.get(), sourceNss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.onRenameCollection(
+ opCtx.get(), sourceNss, targetNss, uuid, dropTargetUuid, 0U, stayTemp);
+ renameOpTime = OpObserver::Times::get(opCtx.get()).reservedOpTimes.front();
+ wunit.commit();
+ }
+
+ auto oplogEntry = getSingleOplogEntry(opCtx.get());
+
+ // Ensure that renameCollection fields were properly added to oplog entry.
+ ASSERT_EQUALS(uuid, unittest::assertGet(UUID::parse(oplogEntry["ui"])));
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected =
+ BSON("renameCollection" << sourceNss.ns() << "to" << targetNss.ns() << "stayTemp"
+ << stayTemp << "dropTarget" << dropTargetUuid);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+
+ // Ensure that the rename optime returned is the same as the last optime in the ReplClientInfo.
+ ASSERT_EQUALS(repl::ReplClientInfo::forClient(&cc()).getLastOp(), renameOpTime);
+}
+
+TEST_F(OpObserverTest, OnRenameCollectionOmitsDropTargetFieldIfDropTargetUuidIsNull) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+
+ auto uuid = UUID::gen();
+ auto stayTemp = true;
+ NamespaceString sourceNss(boost::none, "test.foo");
+ NamespaceString targetNss(boost::none, "test.bar");
+
+ // Write to the oplog.
+ {
+ AutoGetDb autoDb(opCtx.get(), sourceNss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.onRenameCollection(opCtx.get(), sourceNss, targetNss, uuid, {}, 0U, stayTemp);
+ wunit.commit();
+ }
+
+ auto oplogEntry = getSingleOplogEntry(opCtx.get());
+
+ // Ensure that renameCollection fields were properly added to oplog entry.
+ ASSERT_EQUALS(uuid, unittest::assertGet(UUID::parse(oplogEntry["ui"])));
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected = BSON("renameCollection" << sourceNss.ns() << "to" << targetNss.ns()
+ << "stayTemp" << stayTemp);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+}
+
+TEST_F(OpObserverTest, MustBePrimaryToWriteOplogEntries) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+
+ ASSERT_OK(repl::ReplicationCoordinator::get(opCtx.get())
+ ->setFollowerMode(repl::MemberState::RS_SECONDARY));
+
+ Lock::GlobalWrite globalWrite(opCtx.get());
+ WriteUnitOfWork wunit(opCtx.get());
+
+ // No-op writes should be prohibited.
+ ASSERT_THROWS_CODE(
+ opObserver.onOpMessage(opCtx.get(), {}), DBException, ErrorCodes::NotWritablePrimary);
+}
+
+TEST_F(OpObserverTest, ImportCollectionOplogEntry) {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+
+ auto importUUID = UUID::gen();
+ NamespaceString nss(boost::none, "test.coll");
+ long long numRecords = 1;
+ long long dataSize = 2;
+ // A dummy invalid catalog entry. We do not need a valid catalog entry for this test.
+ auto catalogEntry = BSON("ns" << nss.ns() << "ident"
+ << "collection-7-1792004489479993697");
+ auto storageMetadata = BSON("storage"
+ << "metadata");
+ bool isDryRun = false;
+
+ // Write to the oplog.
+ {
+ AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.onImportCollection(opCtx.get(),
+ importUUID,
+ nss,
+ numRecords,
+ dataSize,
+ catalogEntry,
+ storageMetadata,
+ isDryRun);
+ wunit.commit();
+ }
+
+ auto oplogEntryObj = getSingleOplogEntry(opCtx.get());
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ ASSERT_TRUE(repl::OpTypeEnum::kCommand == oplogEntry.getOpType());
+ ASSERT_TRUE(OplogEntry::CommandType::kImportCollection == oplogEntry.getCommandType());
+
+ ImportCollectionOplogEntry importCollection(
+ nss, importUUID, numRecords, dataSize, catalogEntry, storageMetadata, isDryRun);
+ ASSERT_BSONOBJ_EQ(importCollection.toBSON(), oplogEntry.getObject());
+}
+
+TEST_F(OpObserverTest, SingleStatementInsertTestIncludesTenantId) {
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+ auto opCtx = cc().makeOperationContext();
+ const NamespaceString nss(TenantId(OID::gen()), "testDB", "testColl");
+ auto uuid = UUID::gen();
+
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(BSON("_id" << 0 << "data"
+ << "x"));
+
+ WriteUnitOfWork wuow(opCtx.get());
+ AutoGetCollection locks(opCtx.get(), nss, LockMode::MODE_IX);
+
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+ opObserver.onInserts(opCtx.get(), nss, uuid, insert.begin(), insert.end(), false),
+ wuow.commit();
+
+ auto oplogEntryObj = getSingleOplogEntry(opCtx.get());
+ const repl::OplogEntry& entry = assertGet(repl::OplogEntry::parse(oplogEntryObj));
+
+ // TODO SERVER-67155 Check that (nss == entry.getNss()) and uncomment the
+ // line below once the OplogEntry deserializer passes "tid" to the NamespaceString
+ // constructor
+ ASSERT_EQ(NamespaceString(boost::none, nss.ns()), entry.getNss());
+ // ASSERT(nss.tenantId().has_value());
+
+ ASSERT_EQ(*nss.tenantId(), *entry.getTid());
+ ASSERT_EQ(uuid, *entry.getUuid());
+}
+
+TEST_F(OpObserverTest, SingleStatementUpdateTestIncludesTenantId) {
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+ auto opCtx = cc().makeOperationContext();
+ const NamespaceString nss(TenantId(OID::gen()), "testDB", "testColl");
+ auto uuid = UUID::gen();
+
+ CollectionUpdateArgs updateArgs;
+ updateArgs.updatedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ updateArgs.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs.criteria = BSON("_id" << 0);
+ OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+
+ WriteUnitOfWork wuow(opCtx.get());
+ AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
+
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+ opObserver.onUpdate(opCtx.get(), update);
+ wuow.commit();
+
+ auto oplogEntryObj = getSingleOplogEntry(opCtx.get());
+ const repl::OplogEntry& entry = assertGet(repl::OplogEntry::parse(oplogEntryObj));
+
+ ASSERT(nss.tenantId().has_value());
+ ASSERT_EQ(*nss.tenantId(), *entry.getTid());
+ // TODO SERVER-67155 Check that (nss == entry.getNss()) once the OplogEntry deserializer passes
+ // "tid" to the NamespaceString constructor
+ ASSERT_EQ(NamespaceString(boost::none, nss.ns()), entry.getNss());
+ ASSERT_EQ(uuid, *entry.getUuid());
+}
+
+TEST_F(OpObserverTest, SingleStatementDeleteTestIncludesTenantId) {
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+ auto opCtx = cc().makeOperationContext();
+ const NamespaceString nss(TenantId(OID::gen()), "testDB", "testColl");
+ auto uuid = UUID::gen();
+
+ OplogDeleteEntryArgs deleteEntryArgs;
+ WriteUnitOfWork wuow(opCtx.get());
+ AutoGetCollection locks(opCtx.get(), nss, LockMode::MODE_IX);
+
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ repl::documentKeyDecoration(opCtx.get()).emplace(BSON("_id" << 0), boost::none);
+ opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, deleteEntryArgs);
+ wuow.commit();
+
+ auto oplogEntryObj = getSingleOplogEntry(opCtx.get());
+ const repl::OplogEntry& entry = assertGet(repl::OplogEntry::parse(oplogEntryObj));
+
+ // TODO SERVER-67155 Check that (nss == entry.getNss()) once the OplogEntry deserializer passes
+ // "tid" to the NamespaceString constructor
+ // ASSERT(nss.tenantId().has_value());
+ ASSERT_EQ(NamespaceString(boost::none, nss.ns()), entry.getNss());
+ ASSERT_EQ(*nss.tenantId(), *entry.getTid());
+ ASSERT_EQ(uuid, *entry.getUuid());
+}
+
+/**
+ * Test fixture for testing OpObserver behavior specific to the SessionCatalog.
+ */
+class OpObserverSessionCatalogRollbackTest : public OpObserverTest {
+public:
+ void setUp() override {
+ OpObserverTest::setUp();
+
+ auto opCtx = cc().makeOperationContext();
+ }
+
+ /**
+ * Simulate a new write occurring on given session with the given transaction number and
+ * statement id.
+ */
+ void simulateSessionWrite(OperationContext* opCtx,
+ TransactionParticipant::Participant txnParticipant,
+ NamespaceString nss,
+ TxnNumber txnNum,
+ StmtId stmtId) {
+ txnParticipant.beginOrContinue(
+ opCtx, {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */);
+
+ {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
+ auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp.
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setSessionId(*opCtx->getLogicalSessionId());
+ sessionTxnRecord.setTxnNum(txnNum);
+ sessionTxnRecord.setLastWriteOpTime(opTime);
+ sessionTxnRecord.setLastWriteDate(Date_t::now());
+ txnParticipant.onWriteOpCompletedOnPrimary(opCtx, {stmtId}, sessionTxnRecord);
+ wuow.commit();
+ }
+ }
+};
+
+TEST_F(OpObserverSessionCatalogRollbackTest,
+ OnRollbackDoesntInvalidateSessionCatalogIfNoSessionOpsRolledBack) {
+ const NamespaceString nss(boost::none, "testDB", "testColl");
+
+ auto sessionId = makeLogicalSessionIdForTest();
+
+ const TxnNumber txnNum = 0;
+ const StmtId stmtId = 1000;
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ opCtx->setLogicalSessionId(sessionId);
+ MongoDOperationContextSession ocs(opCtx.get());
+ auto txnParticipant = TransactionParticipant::get(opCtx.get());
+ txnParticipant.refreshFromStorageIfNeeded(opCtx.get());
+
+ // Simulate a write occurring on that session
+ simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
+
+ // Check that the statement executed
+ ASSERT(txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx.get(), stmtId));
+ }
+
+ // Because there are no sessions to rollback, the OpObserver should not invalidate the in-memory
+ // session state, so the check after this should still succeed.
+ {
+ auto opCtx = cc().makeOperationContext();
+
+ OpObserverImpl opObserver;
+ OpObserver::RollbackObserverInfo rbInfo;
+ opObserver.onReplicationRollback(opCtx.get(), rbInfo);
+ }
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ opCtx->setLogicalSessionId(sessionId);
+ MongoDOperationContextSession ocs(opCtx.get());
+ auto txnParticipant = TransactionParticipant::get(opCtx.get());
+ ASSERT(txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx.get(), stmtId));
+ }
+}
+
+TEST_F(OpObserverTest, MultipleAboutToDeleteAndOnDelete) {
+ auto uuid = UUID::gen();
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ NamespaceString nss = {boost::none, "test", "coll"};
+ AutoGetDb autoDb(opCtx.get(), nss.db(), MODE_X);
+ WriteUnitOfWork wunit(opCtx.get());
+ opObserver.aboutToDelete(opCtx.get(), nss, uuid, BSON("_id" << 1));
+ opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, {});
+ opObserver.aboutToDelete(opCtx.get(), nss, uuid, BSON("_id" << 1));
+ opObserver.onDelete(opCtx.get(), nss, uuid, kUninitializedStmtId, {});
+}
+
+DEATH_TEST_F(OpObserverTest, AboutToDeleteMustPreceedOnDelete, "invariant") {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ cc().swapLockState(std::make_unique<LockerNoop>());
+ NamespaceString nss = {boost::none, "test", "coll"};
+ opObserver.onDelete(opCtx.get(), nss, UUID::gen(), kUninitializedStmtId, {});
+}
+
+DEATH_TEST_REGEX_F(OpObserverTest,
+ AboutToDeleteRequiresIdField,
+ "Invariant failure.*!id.isEmpty()") {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+ cc().swapLockState(std::make_unique<LockerNoop>());
+ NamespaceString nss = {boost::none, "test", "coll"};
+ UUID uuid = UUID::gen();
+ opObserver.aboutToDelete(opCtx.get(), nss, uuid, {});
+}
+
+DEATH_TEST_REGEX_F(OpObserverTest,
+ NodeCrashesIfShardIdentityDocumentRolledBack,
+ "Fatal assertion.*50712") {
+ OpObserverImpl opObserver;
+ auto opCtx = cc().makeOperationContext();
+
+ OpObserver::RollbackObserverInfo rbInfo;
+ rbInfo.shardIdentityRolledBack = true;
+ opObserver.onReplicationRollback(opCtx.get(), rbInfo);
+}
+
+class OpObserverTxnParticipantTest : public OpObserverTest {
+public:
+ void setUp() override {
+ OpObserverTest::setUp();
+ _opCtx = cc().makeOperationContext();
+ _opObserver.emplace();
+ _times.emplace(opCtx());
+ }
+
+ void tearDown() override {
+ _sessionCheckout.reset();
+ _times.reset();
+ _opCtx.reset();
+
+ OpObserverTest::tearDown();
+ }
+
+ void setUpRetryableWrite() {
+ beginRetryableWriteWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
+
+ void setUpNonRetryableTransaction() {
+ beginNonRetryableTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
+
+ void setUpRetryableInternalTransaction() {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
+
+protected:
+ Session* session() {
+ return OperationContextSession::get(opCtx());
+ }
+
+ OpObserverImpl& opObserver() {
+ return *_opObserver;
+ }
+
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
+ TxnNumber& txnNum() {
+ return _txnNum;
+ }
+
+ TransactionParticipant::Participant& txnParticipant() {
+ return *_txnParticipant;
+ }
+
+ void prepareTransaction(const std::vector<OplogSlot>& reservedSlots,
+ repl::OpTime prepareOpTime,
+ size_t numberOfPrePostImagesToWrite = 0) {
+ auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx());
+ auto currentTime = Date_t::now();
+ auto applyOpsAssignment = opObserver().preTransactionPrepare(
+ opCtx(), reservedSlots, numberOfPrePostImagesToWrite, currentTime, &txnOps);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(opCtx(),
+ reservedSlots,
+ &txnOps,
+ applyOpsAssignment.get(),
+ numberOfPrePostImagesToWrite,
+ currentTime);
+ }
+
+private:
+ class ExposeOpObserverTimes : public OpObserver {
+ public:
+ typedef OpObserver::ReservedTimes ReservedTimes;
+ };
+
+ ServiceContext::UniqueOperationContext _opCtx;
+
+ boost::optional<OpObserverImpl> _opObserver;
+ boost::optional<ExposeOpObserverTimes::ReservedTimes> _times;
+ boost::optional<TransactionParticipant::Participant> _txnParticipant;
+
+ std::unique_ptr<MongoDOperationContextSession> _sessionCheckout;
+ TxnNumber _txnNum = 0;
+};
+
+/**
+ * Test fixture for testing OpObserver behavior specific to multi-document transactions.
+ */
+
+class OpObserverTransactionTest : public OpObserverTxnParticipantTest {
+public:
+ void setUp() override {
+ OpObserverTxnParticipantTest::setUp();
+ OpObserverTxnParticipantTest::setUpNonRetryableTransaction();
+ }
+
+protected:
+ void checkSessionAndTransactionFields(const BSONObj& oplogEntry) {
+ ASSERT_BSONOBJ_EQ(session()->getSessionId().toBSON(), oplogEntry.getObjectField("lsid"));
+ ASSERT_EQ(*opCtx()->getTxnNumber(), oplogEntry.getField("txnNumber").safeNumberLong());
+ }
+ void checkCommonFields(const BSONObj& oplogEntry) {
+ ASSERT_EQ("c"_sd, oplogEntry.getStringField("op"));
+ ASSERT_EQ("admin.$cmd"_sd, oplogEntry.getStringField("ns"));
+ checkSessionAndTransactionFields(oplogEntry);
+ }
+
+ void assertTxnRecord(TxnNumber txnNum,
+ repl::OpTime opTime,
+ boost::optional<DurableTxnStateEnum> txnState) {
+ DBDirectClient client(opCtx());
+ FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace};
+ findRequest.setFilter(BSON("_id" << session()->getSessionId().toBSON()));
+ auto cursor = client.find(std::move(findRequest));
+ ASSERT(cursor);
+ ASSERT(cursor->more());
+
+ auto txnRecordObj = cursor->next();
+ auto txnRecord =
+ SessionTxnRecord::parse(IDLParserErrorContext("SessionEntryWritten"), txnRecordObj);
+ ASSERT(!cursor->more());
+ ASSERT_EQ(session()->getSessionId(), txnRecord.getSessionId());
+ ASSERT_EQ(txnNum, txnRecord.getTxnNum());
+ ASSERT(txnRecord.getState() == txnState);
+ ASSERT_EQ(txnState != boost::none,
+ txnRecordObj.hasField(SessionTxnRecord::kStateFieldName));
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ if (!opTime.isNull()) {
+ ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
+ ASSERT_EQ(opTime, txnParticipant.getLastWriteOpTime());
+ } else {
+ ASSERT_EQ(txnRecord.getLastWriteOpTime(), txnParticipant.getLastWriteOpTime());
+ }
+ }
+
+ void assertNoTxnRecord() {
+ DBDirectClient client(opCtx());
+ FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace};
+ findRequest.setFilter(BSON("_id" << session()->getSessionId().toBSON()));
+ auto cursor = client.find(std::move(findRequest));
+ ASSERT(cursor);
+ ASSERT(!cursor->more());
+ }
+
+ void assertTxnRecordStartOpTime(boost::optional<repl::OpTime> startOpTime) {
+ DBDirectClient client(opCtx());
+ FindCommandRequest findRequest{NamespaceString::kSessionTransactionsTableNamespace};
+ findRequest.setFilter(BSON("_id" << session()->getSessionId().toBSON()));
+ auto cursor = client.find(std::move(findRequest));
+ ASSERT(cursor);
+ ASSERT(cursor->more());
+
+ auto txnRecordObj = cursor->next();
+ auto txnRecord =
+ SessionTxnRecord::parse(IDLParserErrorContext("SessionEntryWritten"), txnRecordObj);
+ ASSERT(!cursor->more());
+ ASSERT_EQ(session()->getSessionId(), txnRecord.getSessionId());
+ if (!startOpTime) {
+ ASSERT(!txnRecord.getStartOpTime());
+ } else {
+ ASSERT(txnRecord.getStartOpTime());
+ ASSERT_EQ(*startOpTime, *txnRecord.getStartOpTime());
+ }
+ }
+};
+
+TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ inserts1.emplace_back(1,
+ BSON("_id" << 1 << "data"
+ << "y"));
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ CollectionUpdateArgs updateArgs2;
+ updateArgs2.stmtIds = {1};
+ updateArgs2.updatedDoc = BSON("_id" << 0 << "data"
+ << "y");
+ updateArgs2.update = BSON("$set" << BSON("data"
+ << "y"));
+ updateArgs2.criteria = BSON("_id" << 0);
+ OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+ opObserver().onUpdate(opCtx(), update2);
+
+ opObserver().aboutToDelete(opCtx(),
+ nss1,
+ uuid1,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ opObserver().onDelete(opCtx(), nss1, uuid1, 0, {});
+
+ // One reserved slot for each statement, plus the prepare.
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 5);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime);
+
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObj = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntryObj);
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ auto o = oplogEntry.getObject();
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 0 << "data"
+ << "x")
+ << "o2" << BSON("_id" << 0))
+ << BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1 << "data"
+ << "y")
+ << "o2" << BSON("_id" << 1))
+ << BSON("op"
+ << "u"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("$set" << BSON("data"
+ << "y"))
+ << "o2" << BSON("_id" << 0))
+ << BSON("op"
+ << "d"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 0)))
+ << "prepare" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT(oplogEntry.shouldPrepare());
+ ASSERT_EQ(oplogEntry.getTimestamp(), prepareOpTime.getTimestamp());
+
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
+ const NamespaceString nss(boost::none, "testDB", "testColl");
+ const auto uuid = UUID::gen();
+ const auto doc = BSON("_id" << 0 << "data"
+ << "x");
+ const auto docKey = BSON("_id" << 0);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(0, doc);
+
+ OplogSlot commitSlot;
+ Timestamp prepareTimestamp;
+ {
+ AutoGetCollection autoColl(opCtx(), nss, MODE_IX);
+ opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false);
+
+ const auto prepareSlot = reserveOpTimeInSideTransaction(opCtx());
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot);
+ prepareTimestamp = prepareSlot.getTimestamp();
+ prepareTransaction({prepareSlot}, prepareSlot);
+
+ commitSlot = reserveOpTimeInSideTransaction(opCtx());
+ }
+
+ // Mimic committing the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ prepareTimestamp,
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
+ repl::OplogInterfaceLocal oplogInterface(opCtx());
+ auto oplogIter = oplogInterface.makeIterator();
+ {
+ auto oplogEntryObj = unittest::assertGet(oplogIter->next()).first;
+ checkCommonFields(oplogEntryObj);
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ auto o = oplogEntry.getObject();
+ auto oExpected = BSON("commitTransaction" << 1 << "commitTimestamp" << prepareTimestamp);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.shouldPrepare());
+ }
+
+ {
+ auto oplogEntryObj = unittest::assertGet(oplogIter->next()).first;
+ checkCommonFields(oplogEntryObj);
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ auto o = oplogEntry.getObject();
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss.toString() << "ui" << uuid << "o"
+ << doc << "o2" << docKey))
+ << "prepare" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT(oplogEntry.shouldPrepare());
+ }
+
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus());
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) {
+ const NamespaceString nss(boost::none, "testDB", "testColl");
+ const auto uuid = UUID::gen();
+ const auto doc = BSON("_id" << 0 << "data"
+ << "x");
+ const auto docKey = BSON("_id" << 0);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(0, doc);
+
+ OplogSlot abortSlot;
+ {
+ AutoGetCollection autoColl(opCtx(), nss, MODE_IX);
+ opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false);
+
+ const auto prepareSlot = reserveOpTimeInSideTransaction(opCtx());
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot);
+ prepareTransaction({prepareSlot}, prepareSlot);
+ abortSlot = reserveOpTimeInSideTransaction(opCtx());
+ }
+
+ // Mimic aborting the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onTransactionAbort(opCtx(), abortSlot);
+ }
+ txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
+
+ repl::OplogInterfaceLocal oplogInterface(opCtx());
+ auto oplogIter = oplogInterface.makeIterator();
+ {
+ auto oplogEntryObj = unittest::assertGet(oplogIter->next()).first;
+ checkCommonFields(oplogEntryObj);
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ auto o = oplogEntry.getObject();
+ auto oExpected = BSON("abortTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.shouldPrepare());
+ }
+
+ {
+ auto oplogEntryObj = unittest::assertGet(oplogIter->next()).first;
+ checkCommonFields(oplogEntryObj);
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ auto o = oplogEntry.getObject();
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss.toString() << "ui" << uuid << "o"
+ << doc << "o2" << docKey))
+ << "prepare" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT(oplogEntry.shouldPrepare());
+ }
+
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus());
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalUnpreparedAbortTest) {
+ const NamespaceString nss(boost::none, "testDB", "testColl");
+ const auto uuid = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+
+ {
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl(opCtx(), nss, MODE_IX);
+ opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false);
+
+ txnParticipant.transitionToAbortedWithoutPrepareforTest(opCtx());
+ opObserver().onTransactionAbort(opCtx(), boost::none);
+ }
+
+ // Assert no oplog entries were written.
+ repl::OplogInterfaceLocal oplogInterface(opCtx());
+ auto oplogIter = oplogInterface.makeIterator();
+ ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, oplogIter->next().getStatus());
+}
+
+TEST_F(OpObserverTransactionTest,
+ PreparingEmptyTransactionLogsEmptyApplyOpsAndWritesToTransactionTable) {
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+ repl::OpTime prepareOpTime;
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ prepareOpTime = reserveOpTimeInSideTransaction(opCtx());
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction({prepareOpTime}, prepareOpTime);
+ }
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObj = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntryObj);
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ auto o = oplogEntry.getObject();
+ auto oExpected = BSON("applyOps" << BSONArray() << "prepare" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT(oplogEntry.shouldPrepare());
+ const auto startOpTime = oplogEntry.getOpTime();
+ ASSERT_EQ(startOpTime.getTimestamp(), prepareOpTime.getTimestamp());
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecordStartOpTime(startOpTime);
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+}
+
+TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable) {
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+
+ repl::OpTime prepareOpTime;
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = reserveOpTimeInSideTransaction(opCtx());
+ txnParticipant.transitionToPreparedforTest(opCtx(), slot);
+ prepareOpTime = slot;
+ prepareTransaction({slot}, prepareOpTime);
+ }
+
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+}
+
+TEST_F(OpObserverTransactionTest, AbortingUnpreparedTransactionDoesNotWriteToTransactionTable) {
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+
+ opObserver().onTransactionAbort(opCtx(), boost::none);
+ txnParticipant.stashTransactionResources(opCtx());
+
+ // Abort the storage-transaction without calling the OpObserver.
+ txnParticipant.shutdown(opCtx());
+
+ assertNoTxnRecord();
+}
+
+TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransactionTable) {
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+
+ OplogSlot abortSlot;
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = reserveOpTimeInSideTransaction(opCtx());
+ prepareTransaction({slot}, slot);
+ txnParticipant.transitionToPreparedforTest(opCtx(), slot);
+ abortSlot = reserveOpTimeInSideTransaction(opCtx());
+ }
+
+ // Mimic aborting the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onTransactionAbort(opCtx(), abortSlot);
+ txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
+ }
+ txnParticipant.stashTransactionResources(opCtx());
+
+ // Abort the storage-transaction without calling the OpObserver.
+ txnParticipant.shutdown(opCtx());
+
+ assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kAborted);
+}
+
+TEST_F(OpObserverTransactionTest, CommittingUnpreparedNonEmptyTransactionWritesToTransactionTable) {
+ const NamespaceString nss(boost::none, "testDB", "testColl");
+ const auto uuid = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+
+ {
+ AutoGetCollection autoColl(opCtx(), nss, MODE_IX);
+ opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false);
+ }
+
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ opCtx()->getWriteUnitOfWork()->commit();
+
+ assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(OpObserverTransactionTest,
+ CommittingUnpreparedEmptyTransactionDoesNotWriteToTransactionTableOrOplog) {
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+
+ txnParticipant.stashTransactionResources(opCtx());
+
+ getNOplogEntries(opCtx(), 0);
+
+ // Abort the storage-transaction without calling the OpObserver.
+ txnParticipant.shutdown(opCtx());
+
+ assertNoTxnRecord();
+}
+
+TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransactionTable) {
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
+
+ repl::OpTime prepareOpTime;
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ OplogSlot slot = reserveOpTimeInSideTransaction(opCtx());
+ prepareOpTime = slot;
+ prepareTransaction({slot}, slot);
+ txnParticipant.transitionToPreparedforTest(opCtx(), slot);
+ }
+
+ OplogSlot commitSlot = reserveOpTimeInSideTransaction(opCtx());
+ repl::OpTime commitOpTime = commitSlot;
+ ASSERT_LTE(prepareOpTime, commitOpTime);
+
+ // Mimic committing the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ prepareOpTime.getTimestamp(),
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
+ assertTxnRecord(txnNum(), commitOpTime, DurableTxnStateEnum::kCommitted);
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalInsertTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ inserts1.emplace_back(1,
+ BSON("_id" << 1 << "data"
+ << "y"));
+ std::vector<InsertStatement> inserts2;
+ inserts2.emplace_back(0,
+ BSON("_id" << 2 << "data"
+ << "z"));
+ inserts2.emplace_back(1,
+ BSON("_id" << 3 << "data"
+ << "w"));
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+ opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntryObj = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntryObj);
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ auto o = oplogEntry.getObject();
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 0 << "data"
+ << "x")
+ << "o2" << BSON("_id" << 0))
+ << BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1 << "data"
+ << "y")
+ << "o2" << BSON("_id" << 1))
+ << BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 2 << "data"
+ << "z")
+ << "o2" << BSON("_id" << 2))
+ << BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 3 << "data"
+ << "w")
+ << "o2" << BSON("_id" << 3))));
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_FALSE(oplogEntryObj.hasField("prepare"));
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalInsertTestIncludesTenantId) {
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+ const NamespaceString nss1(TenantId(OID::gen()), "testDB", "testColl");
+ const NamespaceString nss2(TenantId(OID::gen()), "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ inserts1.emplace_back(1,
+ BSON("_id" << 1 << "data"
+ << "y"));
+ std::vector<InsertStatement> inserts2;
+ inserts2.emplace_back(0,
+ BSON("_id" << 2 << "data"
+ << "z"));
+ inserts2.emplace_back(1,
+ BSON("_id" << 3 << "data"
+ << "w"));
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+ opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntryObj = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntryObj);
+ OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ auto o = oplogEntry.getObject();
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "tid" << nss1.tenantId().get() << "ns"
+ << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 0 << "data"
+ << "x")
+ << "o2" << BSON("_id" << 0))
+ << BSON("op"
+ << "i"
+ << "tid" << nss1.tenantId().get() << "ns"
+ << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1 << "data"
+ << "y")
+ << "o2" << BSON("_id" << 1))
+ << BSON("op"
+ << "i"
+ << "tid" << nss2.tenantId().get() << "ns"
+ << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 2 << "data"
+ << "z")
+ << "o2" << BSON("_id" << 2))
+ << BSON("op"
+ << "i"
+ << "tid" << nss2.tenantId().get() << "ns"
+ << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 3 << "data"
+ << "w")
+ << "o2" << BSON("_id" << 3))));
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_FALSE(oplogEntryObj.hasField("prepare"));
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "update");
+
+ CollectionUpdateArgs updateArgs1;
+ updateArgs1.stmtIds = {0};
+ updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ updateArgs1.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs1.criteria = BSON("_id" << 0);
+ OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+
+ CollectionUpdateArgs updateArgs2;
+ updateArgs2.stmtIds = {1};
+ updateArgs2.updatedDoc = BSON("_id" << 1 << "data"
+ << "y");
+ updateArgs2.update = BSON("$set" << BSON("data"
+ << "y"));
+ updateArgs2.criteria = BSON("_id" << 1);
+ OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onUpdate(opCtx(), update1);
+ opObserver().onUpdate(opCtx(), update2);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntry = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntry);
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("$set" << BSON("data"
+ << "x"))
+ << "o2" << BSON("_id" << 0))
+ << BSON("op"
+ << "u"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("$set" << BSON("data"
+ << "y"))
+ << "o2" << BSON("_id" << 1))));
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.hasField("prepare"));
+ ASSERT_FALSE(oplogEntry.getBoolField("prepare"));
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) {
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+ const NamespaceString nss1(TenantId(OID::gen()), "testDB", "testColl");
+ const NamespaceString nss2(TenantId(OID::gen()), "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "update");
+
+ CollectionUpdateArgs updateArgs1;
+ updateArgs1.stmtIds = {0};
+ updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ updateArgs1.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs1.criteria = BSON("_id" << 0);
+ OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+
+ CollectionUpdateArgs updateArgs2;
+ updateArgs2.stmtIds = {1};
+ updateArgs2.updatedDoc = BSON("_id" << 1 << "data"
+ << "y");
+ updateArgs2.update = BSON("$set" << BSON("data"
+ << "y"));
+ updateArgs2.criteria = BSON("_id" << 1);
+ OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onUpdate(opCtx(), update1);
+ opObserver().onUpdate(opCtx(), update2);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntry = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntry);
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "tid" << nss1.tenantId().get() << "ns"
+ << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("$set" << BSON("data"
+ << "x"))
+ << "o2" << BSON("_id" << 0))
+ << BSON("op"
+ << "u"
+ << "tid" << nss2.tenantId().get() << "ns"
+ << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("$set" << BSON("data"
+ << "y"))
+ << "o2" << BSON("_id" << 1))));
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.hasField("prepare"));
+ ASSERT_FALSE(oplogEntry.getBoolField("prepare"));
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "delete");
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().aboutToDelete(opCtx(),
+ nss1,
+ uuid1,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ opObserver().onDelete(opCtx(), nss1, uuid1, 0, {});
+ opObserver().aboutToDelete(opCtx(),
+ nss2,
+ uuid2,
+ BSON("_id" << 1 << "data"
+ << "y"));
+ opObserver().onDelete(opCtx(), nss2, uuid2, 0, {});
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntry = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntry);
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns" << nss1.toString() << "ui" << uuid1
+ << "o" << BSON("_id" << 0))
+ << BSON("op"
+ << "d"
+ << "ns" << nss2.toString() << "ui"
+ << uuid2 << "o" << BSON("_id" << 1))));
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.hasField("prepare"));
+ ASSERT_FALSE(oplogEntry.getBoolField("prepare"));
+}
+
+TEST_F(OpObserverTransactionTest, TransactionalDeleteTestIncludesTenantId) {
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+ const NamespaceString nss1(TenantId(OID::gen()), "testDB", "testColl");
+ const NamespaceString nss2(TenantId(OID::gen()), "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "delete");
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().aboutToDelete(opCtx(),
+ nss1,
+ uuid1,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ opObserver().onDelete(opCtx(), nss1, uuid1, 0, {});
+ opObserver().aboutToDelete(opCtx(),
+ nss2,
+ uuid2,
+ BSON("_id" << 1 << "data"
+ << "y"));
+ opObserver().onDelete(opCtx(), nss2, uuid2, 0, {});
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntry = getSingleOplogEntry(opCtx());
+ checkCommonFields(oplogEntry);
+ auto o = oplogEntry.getObjectField("o");
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(
+ BSON("op"
+ << "d"
+ << "tid" << nss1.tenantId().get() << "ns" << nss1.toString()
+ << "ui" << uuid1 << "o" << BSON("_id" << 0))
+ << BSON("op"
+ << "d"
+ << "tid" << nss2.tenantId().get() << "ns" << nss2.toString()
+ << "ui" << uuid2 << "o" << BSON("_id" << 1))));
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_FALSE(oplogEntry.hasField("prepare"));
+ ASSERT_FALSE(oplogEntry.getBoolField("prepare"));
+}
+
+class OpObserverMultiEntryTransactionTest : public OpObserverTransactionTest {
+ void setUp() override {
+ _prevPackingLimit = gMaxNumberOfTransactionOperationsInSingleOplogEntry;
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = 1;
+ OpObserverTransactionTest::setUp();
+ }
+
+ void tearDown() override {
+ OpObserverTransactionTest::tearDown();
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = _prevPackingLimit;
+ }
+
+private:
+ int _prevPackingLimit;
+};
+
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) {
+ const NamespaceString nss(boost::none, "testDB", "testColl");
+ auto uuid = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ std::vector<InsertStatement> inserts;
+ inserts.emplace_back(0, BSON("_id" << 0 << "a" << std::string(BSONObjMaxUserSize, 'a')));
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss, MODE_IX);
+ opObserver().onInserts(opCtx(), nss, uuid, inserts.begin(), inserts.end(), false);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntryObj = getNOplogEntries(opCtx(), 1)[0];
+ checkSessionAndTransactionFields(oplogEntryObj);
+ auto oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(repl::OpTime(), *oplogEntry.getPrevWriteOpTimeInTransaction());
+
+ // The implicit commit oplog entry.
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON(
+ "op"
+ << "i"
+ << "ns" << nss.toString() << "ui" << uuid << "o"
+ << BSON("_id" << 0 << "a" << std::string(BSONObjMaxUserSize, 'a'))
+ << "o2" << BSON("_id" << 0))));
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntry.getObject());
+}
+
+/**
+ * Test fixture for testing OpObserver behavior specific to retryable findAndModify.
+ */
+class OpObserverRetryableFindAndModifyTest : public OpObserverTxnParticipantTest {
+public:
+ void tearDown() override {
+ OpObserverTxnParticipantTest::tearDown();
+ }
+
+protected:
+ void testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage() {
+ NamespaceString nss = {boost::none, "test", "coll"};
+ const auto uuid = UUID::gen();
+
+ CollectionUpdateArgs updateArgs;
+ updateArgs.stmtIds = {0};
+ updateArgs.updatedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ updateArgs.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs.criteria = BSON("_id" << 0);
+ updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage;
+ OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+ update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ opObserver().onUpdate(opCtx(), update);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "postImage"_sd);
+
+ finish();
+ }
+
+ void testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage() {
+ NamespaceString nss = {boost::none, "test", "coll"};
+ const auto uuid = UUID::gen();
+
+ CollectionUpdateArgs updateArgs;
+ updateArgs.stmtIds = {0};
+ updateArgs.preImageDoc = BSON("_id" << 0 << "data"
+ << "y");
+ updateArgs.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs.criteria = BSON("_id" << 0);
+ updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage;
+ OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+ update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ opObserver().onUpdate(opCtx(), update);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "preImage"_sd);
+
+ finish();
+ }
+
+ void testRetryableFindAndModifyDeleteHasNeedsRetryImage() {
+ NamespaceString nss = {boost::none, "test", "coll"};
+ const auto uuid = UUID::gen();
+
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ const auto deletedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc);
+ OplogDeleteEntryArgs args;
+ args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+ args.deletedDoc = &deletedDoc;
+ opObserver().onDelete(opCtx(), nss, uuid, 0, args);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "preImage"_sd);
+
+ finish();
+ }
+
+ virtual void commit() = 0;
+
+ virtual void finish() {}
+
+ virtual BSONObj assertGetSingleOplogEntry() = 0;
+};
+
+class OpObserverRetryableFindAndModifyOutsideTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
+ void setUp() override {
+ OpObserverTxnParticipantTest::setUp();
+ OpObserverTxnParticipantTest::setUpRetryableWrite();
+ }
+
+protected:
+ void commit() final{};
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getSingleOplogEntry(opCtx());
+ }
+};
+
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
+ WriteUnitOfWork wuow{opCtx()};
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
+ WriteUnitOfWork wuow{opCtx()};
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ WriteUnitOfWork wuow{opCtx()};
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
+
+class OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
+ void setUp() override {
+ OpObserverTxnParticipantTest::setUp();
+ OpObserverTxnParticipantTest::setUpRetryableInternalTransaction();
+ }
+
+protected:
+ void commit() final {
+ commitUnpreparedTransaction<OpObserverImpl>(opCtx(), opObserver());
+ };
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx());
+ }
+};
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
+ WriteUnitOfWork wuow{opCtx()};
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
+ WriteUnitOfWork wuow{opCtx()};
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ WriteUnitOfWork wuow{opCtx()};
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
+
+class OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
+ void setUp() override {
+ OpObserverTxnParticipantTest::setUp();
+ OpObserverTxnParticipantTest::setUpRetryableInternalTransaction();
+ }
+
+protected:
+ void commit() final {
+ const auto reservedOplogSlots = reserveOpTimesInSideTransaction(
+ opCtx(), 1 + txnParticipant().getNumberOfPrePostImagesToWriteForTest());
+ invariant(reservedOplogSlots.size() >= 1);
+ const auto prepareSlot = reservedOplogSlots.back();
+ txnParticipant().transitionToPreparedforTest(opCtx(), prepareSlot);
+ prepareTransaction(reservedOplogSlots,
+ prepareSlot,
+ txnParticipant().getNumberOfPrePostImagesToWriteForTest());
+ TransactionParticipant::get(opCtx()).stashTransactionResources(opCtx());
+ };
+
+ void finish() final {
+ TransactionParticipant::get(opCtx()).unstashTransactionResources(opCtx(),
+ "abortTransaction");
+ }
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx());
+ }
+};
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
+ TransactionParticipant::get(opCtx()).unstashTransactionResources(opCtx(), "update");
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
+ TransactionParticipant::get(opCtx()).unstashTransactionResources(opCtx(), "update");
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ TransactionParticipant::get(opCtx()).unstashTransactionResources(opCtx(), "delete");
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
+
+boost::optional<OplogEntry> findByTimestamp(const std::vector<BSONObj>& oplogs, Timestamp ts) {
+ for (auto& oplog : oplogs) {
+ const auto& entry = assertGet(OplogEntry::parse(oplog));
+ if (entry.getTimestamp() == ts) {
+ return entry;
+ }
+ }
+ return boost::none;
+}
+
+using StoreDocOption = CollectionUpdateArgs::StoreDocOption;
+
+namespace {
+const auto kNonFaM = StoreDocOption::None;
+const auto kFaMPre = StoreDocOption::PreImage;
+const auto kFaMPost = StoreDocOption::PostImage;
+
+const bool kRecordPreImages = true;
+const bool kDoNotRecordPreImages = false;
+
+const bool kChangeStreamImagesEnabled = true;
+const bool kChangeStreamImagesDisabled = false;
+
+const auto kNotRetryable = RetryableFindAndModifyLocation::kNone;
+const auto kRecordInOplog = RetryableFindAndModifyLocation::kOplog;
+const auto kRecordInSideCollection = RetryableFindAndModifyLocation::kSideCollection;
+
+const std::vector<bool> kInMultiDocumentTransactionCases{false, true};
+} // namespace
+
+struct UpdateTestCase {
+ StoreDocOption imageType;
+ bool alwaysRecordPreImages;
+ bool changeStreamImagesEnabled;
+ RetryableFindAndModifyLocation retryableOptions;
+
+ int numOutputOplogs;
+
+ bool isFindAndModify() const {
+ return imageType != StoreDocOption::None;
+ }
+
+ bool isRetryable() const {
+ return retryableOptions != kNotRetryable;
+ }
+
+ std::string getImageTypeStr() const {
+ switch (imageType) {
+ case StoreDocOption::None:
+ return "None";
+ case StoreDocOption::PreImage:
+ return "PreImage";
+ case StoreDocOption::PostImage:
+ return "PostImage";
+ }
+ MONGO_UNREACHABLE;
+ }
+
+ std::string getRetryableFindAndModifyLocationStr() const {
+ switch (retryableOptions) {
+ case kNotRetryable:
+ return "Not retryable";
+ case kRecordInOplog:
+ return "Images in oplog";
+ case kRecordInSideCollection:
+ return "Images in side collection";
+ }
+ MONGO_UNREACHABLE;
+ }
+};
+
+class OnUpdateOutputsTest : public OpObserverTest {
+protected:
+ void logTestCase(const UpdateTestCase& testCase) {
+ LOGV2(5739902,
+ "UpdateTestCase",
+ "ImageType"_attr = testCase.getImageTypeStr(),
+ "PreImageRecording"_attr = testCase.alwaysRecordPreImages,
+ "ChangeStreamPreAndPostImagesEnabled"_attr = testCase.changeStreamImagesEnabled,
+ "RetryableFindAndModifyLocation"_attr =
+ testCase.getRetryableFindAndModifyLocationStr(),
+ "ExpectedOplogEntries"_attr = testCase.numOutputOplogs);
+ }
+
+ void initializeOplogUpdateEntryArgs(OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ OplogUpdateEntryArgs* update) {
+ update->updateArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
+ update->updateArgs->changeStreamPreAndPostImagesEnabledForCollection =
+ testCase.changeStreamImagesEnabled;
+
+ switch (testCase.retryableOptions) {
+ case kNotRetryable:
+ update->updateArgs->stmtIds = {kUninitializedStmtId};
+ break;
+ case kRecordInOplog:
+ update->retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog;
+ update->updateArgs->stmtIds = {1};
+ break;
+ case kRecordInSideCollection:
+ update->retryableFindAndModifyLocation =
+ RetryableFindAndModifyLocation::kSideCollection;
+ update->updateArgs->stmtIds = {1};
+ if (testCase.retryableOptions == kRecordInSideCollection) {
+ // 'getNextOpTimes' requires us to be inside a WUOW when reserving oplog slots.
+ WriteUnitOfWork wuow(opCtx);
+ auto reservedSlots = repl::getNextOpTimes(opCtx, 3);
+ update->updateArgs->oplogSlots = reservedSlots;
+ }
+ break;
+ }
+ update->updateArgs->preImageDoc = boost::none;
+ if (testCase.imageType == StoreDocOption::PreImage || testCase.alwaysRecordPreImages ||
+ testCase.changeStreamImagesEnabled) {
+ update->updateArgs->preImageDoc = BSON("_id" << 0 << "preImage" << true);
+ }
+ update->updateArgs->updatedDoc = BSON("_id" << 0 << "postImage" << true);
+ update->updateArgs->update =
+ BSON("$set" << BSON("postImage" << true) << "$unset" << BSON("preImage" << 1));
+ update->updateArgs->criteria = BSON("_id" << 0);
+ update->updateArgs->storeDocOption = testCase.imageType;
+ }
+
+ void checkPreImageInOplogIfNeeded(
+ const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry,
+ const boost::optional<OplogEntry>& applyOpsOplogEntry = boost::none) {
+ const bool checkPreImageInOplog = testCase.alwaysRecordPreImages ||
+ (testCase.imageType == StoreDocOption::PreImage &&
+ testCase.retryableOptions == kRecordInOplog);
+ if (checkPreImageInOplog) {
+ ASSERT(updateOplogEntry.getPreImageOpTime());
+ if (applyOpsOplogEntry) {
+ ASSERT_FALSE(applyOpsOplogEntry->getPreImageOpTime());
+ }
+
+ const Timestamp preImageOpTime = updateOplogEntry.getPreImageOpTime()->getTimestamp();
+ ASSERT_FALSE(preImageOpTime.isNull());
+ OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime);
+ ASSERT_BSONOBJ_EQ(update.updateArgs->preImageDoc.get(), preImage.getObject());
+ if (updateOplogEntry.getSessionId()) {
+ ASSERT_EQ(*updateOplogEntry.getSessionId(), *preImage.getSessionId());
+ }
+ if (updateOplogEntry.getTxnNumber()) {
+ ASSERT_EQ(*updateOplogEntry.getTxnNumber(), *preImage.getTxnNumber());
+ }
+ if (!updateOplogEntry.getStatementIds().empty()) {
+ const auto& updateOplogStmtIds = updateOplogEntry.getStatementIds();
+ const auto& preImageOplogStmtIds = preImage.getStatementIds();
+ ASSERT_EQ(updateOplogStmtIds.size(), preImageOplogStmtIds.size());
+ for (size_t i = 0; i < updateOplogStmtIds.size(); i++) {
+ ASSERT_EQ(updateOplogStmtIds[i], preImageOplogStmtIds[i]);
+ }
+ }
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getPreImageOpTime());
+ }
+ }
+
+ void checkPostImageInOplogIfNeeded(
+ const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry,
+ const boost::optional<OplogEntry>& applyOpsOplogEntry = boost::none) {
+ const bool checkPostImageInOplog = testCase.imageType == StoreDocOption::PostImage &&
+ testCase.retryableOptions == kRecordInOplog;
+ if (checkPostImageInOplog) {
+ ASSERT(updateOplogEntry.getPostImageOpTime());
+ if (applyOpsOplogEntry) {
+ ASSERT_FALSE(applyOpsOplogEntry->getPostImageOpTime());
+ }
+
+ const Timestamp postImageOpTime = updateOplogEntry.getPostImageOpTime()->getTimestamp();
+ ASSERT_FALSE(postImageOpTime.isNull());
+ OplogEntry postImage = *findByTimestamp(oplogs, postImageOpTime);
+ ASSERT_BSONOBJ_EQ(update.updateArgs->updatedDoc, postImage.getObject());
+ if (updateOplogEntry.getSessionId()) {
+ ASSERT_EQ(*updateOplogEntry.getSessionId(), *postImage.getSessionId());
+ }
+ if (updateOplogEntry.getTxnNumber()) {
+ ASSERT_EQ(*updateOplogEntry.getTxnNumber(), *postImage.getTxnNumber());
+ }
+ if (!updateOplogEntry.getStatementIds().empty()) {
+ const auto& updateOplogStmtIds = updateOplogEntry.getStatementIds();
+ const auto& postImageOplogStmtIds = postImage.getStatementIds();
+ ASSERT_EQ(updateOplogStmtIds.size(), postImageOplogStmtIds.size());
+ for (size_t i = 0; i < updateOplogStmtIds.size(); i++) {
+ ASSERT_EQ(updateOplogStmtIds[i], postImageOplogStmtIds[i]);
+ }
+ }
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getPostImageOpTime());
+ }
+ }
+
+ void checkSideCollectionIfNeeded(
+ OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry,
+ const boost::optional<OplogEntry>& applyOpsOplogEntry = boost::none) {
+ bool checkSideCollection =
+ testCase.isFindAndModify() && testCase.retryableOptions == kRecordInSideCollection;
+ if (checkSideCollection && testCase.alwaysRecordPreImages &&
+ testCase.imageType == StoreDocOption::PreImage) {
+ // When `alwaysRecordPreImages` is enabled for a collection, we always store an
+ // image in the oplog. To avoid unnecessary writes, we won't also store an image
+ // in the side collection.
+ checkSideCollection = false;
+ }
+ if (checkSideCollection) {
+ repl::ImageEntry imageEntry =
+ getImageEntryFromSideCollection(opCtx, *updateOplogEntry.getSessionId());
+ const BSONObj& expectedImage = testCase.imageType == StoreDocOption::PreImage
+ ? update.updateArgs->preImageDoc.get()
+ : update.updateArgs->updatedDoc;
+ ASSERT_BSONOBJ_EQ(expectedImage, imageEntry.getImage());
+ ASSERT(imageEntry.getImageKind() == updateOplogEntry.getNeedsRetryImage());
+ if (applyOpsOplogEntry) {
+ ASSERT_FALSE(applyOpsOplogEntry->getNeedsRetryImage());
+ }
+ if (testCase.imageType == StoreDocOption::PreImage) {
+ ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage);
+ } else {
+ ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPostImage);
+ }
+
+ // If 'updateOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged
+ // noop oplog entry for the pre/postImage written to the side collection.
+ const Timestamp forgeNoopTimestamp = updateOplogEntry.getTimestamp() - 1;
+ ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp));
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getNeedsRetryImage());
+ if (updateOplogEntry.getSessionId()) {
+ ASSERT_FALSE(
+ didWriteImageEntryToSideCollection(opCtx, *updateOplogEntry.getSessionId()));
+ } else {
+ // Session id is missing only for non-retryable option.
+ ASSERT(testCase.retryableOptions == kNotRetryable);
+ }
+ }
+ }
+
+ void checkChangeStreamImagesIfNeeded(OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const OplogEntry& updateOplogEntry) {
+ if (testCase.changeStreamImagesEnabled) {
+ BSONObj container;
+ ChangeStreamPreImageId preImageId(
+ _uuid, updateOplogEntry.getOpTime().getTimestamp(), 0);
+ ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container);
+ const BSONObj& expectedImage = update.updateArgs->preImageDoc.get();
+ ASSERT_BSONOBJ_EQ(expectedImage, preImage.getPreImage());
+ ASSERT_EQ(updateOplogEntry.getWallClockTime(), preImage.getOperationTime());
+ }
+ }
+
+ std::vector<UpdateTestCase> _cases = {
+ // Regular updates.
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
+ // FindAndModify asking for a preImage.
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
+ // FindAndModify asking for a postImage.
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
+
+ const NamespaceString _nss{boost::none, "test", "coll"};
+ const UUID _uuid = UUID::gen();
+};
+
+TEST_F(OnUpdateOutputsTest, TestNonTransactionFundamentalOnUpdateOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the update call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ CollectionUpdateArgs updateArgs;
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
+ initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ opObserver.onUpdate(opCtx, updateEntryArgs);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto updateOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkChangeStreamImagesIfNeeded(opCtx, testCase, updateEntryArgs, updateOplogEntry);
+ }
+}
+
+TEST_F(OnUpdateOutputsTest, TestFundamentalTransactionOnUpdateOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the update call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ } else {
+ beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ CollectionUpdateArgs updateArgs;
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
+ initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ opObserver.onUpdate(opCtx, updateEntryArgs);
+ commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ auto updateOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry);
+ checkPreImageInOplogIfNeeded(
+ testCase, updateEntryArgs, oplogs, updateOplogEntry, applyOpsOplogEntry);
+ checkPostImageInOplogIfNeeded(
+ testCase, updateEntryArgs, oplogs, updateOplogEntry, applyOpsOplogEntry);
+ checkSideCollectionIfNeeded(
+ opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry, applyOpsOplogEntry);
+ checkChangeStreamImagesIfNeeded(opCtx, testCase, updateEntryArgs, updateOplogEntry);
+ }
+}
+
+TEST_F(OnUpdateOutputsTest,
+ RetryableInternalTransactionUpdateWithPreImageRecordingEnabledOnShardServerThrows) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ beginRetryableInternalTransactionWithTxnNumber(opCtx, 0, contextSession);
+
+ CollectionUpdateArgs updateArgs;
+ updateArgs.preImageRecordingEnabledForCollection = true;
+ updateArgs.preImageDoc = BSON("_id" << 0 << "preImage" << true);
+ updateArgs.updatedDoc = BSON("_id" << 0 << "postImage" << true);
+ updateArgs.update =
+ BSON("$set" << BSON("postImage" << true) << "$unset" << BSON("preImage" << 1));
+ updateArgs.criteria = BSON("_id" << 0);
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ ON_BLOCK_EXIT([] { serverGlobalParams.clusterRole = ClusterRole::None; });
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ ASSERT_THROWS_CODE(opObserver.onUpdate(opCtx, updateEntryArgs), DBException, 6462400);
+}
+
+struct InsertTestCase {
+ bool isRetryableWrite;
+ int numDocsToInsert;
+};
+TEST_F(OpObserverTest, TestFundamentalOnInsertsOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on the
+ // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due
+ // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ NamespaceString nss(boost::none, "test", "coll");
+ UUID uuid = UUID::gen();
+
+ const bool isRetryableWrite = true;
+ const bool isNotRetryableWrite = false;
+ const int oneDoc = 1;
+ const int threeDocs = 3;
+
+ std::vector<InsertTestCase> cases{{isNotRetryableWrite, oneDoc},
+ {isNotRetryableWrite, threeDocs},
+ {isRetryableWrite, oneDoc},
+ {isRetryableWrite, threeDocs}};
+
+ for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) {
+ const auto& testCase = cases[testIdx];
+ LOGV2(5739904,
+ "InsertTestCase",
+ "Retryable"_attr = testCase.isRetryableWrite,
+ "NumDocsToInsert"_attr = testCase.numDocsToInsert);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ // Phase 1: Clearing any state and setting up fixtures/the update call.
+ resetOplogAndTransactions(opCtx);
+
+ std::vector<InsertStatement> toInsert;
+ for (int stmtIdx = 0; stmtIdx < testCase.numDocsToInsert; ++stmtIdx) {
+ StmtId stmtId = testCase.isRetryableWrite ? StmtId(stmtIdx) : kUninitializedStmtId;
+ toInsert.emplace_back(stmtId, BSON("_id" << stmtIdx));
+ }
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryableWrite) {
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX);
+ const bool fromMigrate = false;
+ opObserver.onInserts(opCtx, nss, uuid, toInsert.begin(), toInsert.end(), fromMigrate);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // ----
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, toInsert.size());
+ // Entries are returned in ascending timestamp order.
+ for (std::size_t opIdx = 0; opIdx < oplogs.size(); ++opIdx) {
+ const repl::OplogEntry& entry = assertGet(repl::OplogEntry::parse(oplogs[opIdx]));
+
+ ASSERT_BSONOBJ_EQ(entry.getObject(), BSON("_id" << static_cast<int>(opIdx)));
+ if (!testCase.isRetryableWrite) {
+ ASSERT_FALSE(entry.getSessionId());
+ ASSERT_FALSE(entry.getTxnNumber());
+ ASSERT_EQ(0, entry.getStatementIds().size());
+ continue;
+ }
+
+ // Only for retryable writes:
+ ASSERT_EQ(opCtx->getLogicalSessionId().get(), entry.getSessionId().get());
+ ASSERT_EQ(opCtx->getTxnNumber().get(), entry.getTxnNumber().get());
+ ASSERT_EQ(1, entry.getStatementIds().size());
+ ASSERT_EQ(StmtId(opIdx), entry.getStatementIds()[0]);
+ // When we insert multiple documents in retryable writes, each insert will "link" back
+ // to the previous insert. This code verifies that C["prevOpTime"] -> B and
+ // B["prevOpTime"] -> A.
+ Timestamp expectedPrevWriteOpTime = Timestamp(0, 0);
+ if (opIdx > 0) {
+ expectedPrevWriteOpTime =
+ oplogs[opIdx - 1][repl::OplogEntryBase::kTimestampFieldName].timestamp();
+ }
+ ASSERT_EQ(expectedPrevWriteOpTime,
+ entry.getPrevWriteOpTimeInTransaction().get().getTimestamp());
+ }
+
+ if (testCase.isRetryableWrite) {
+ // Also assert for retryable writes that the `config.transactions` entry's
+ // `lastWriteOpTime` and `txnNum` reflects the latest oplog entry.
+ SessionTxnRecord transactionRecord = getTxnRecord(opCtx, *opCtx->getLogicalSessionId());
+ ASSERT_EQ(oplogs.back()[repl::OplogEntryBase::kTimestampFieldName].timestamp(),
+ transactionRecord.getLastWriteOpTime().getTimestamp());
+ ASSERT_EQ(oplogs.back()[repl::OplogEntryBase::kTxnNumberFieldName].Long(),
+ transactionRecord.getTxnNum());
+ }
+ }
+}
+
+struct DeleteTestCase {
+ bool alwaysRecordPreImages;
+ bool changeStreamImagesEnabled;
+ RetryableFindAndModifyLocation retryableOptions;
+
+ int numOutputOplogs;
+
+ bool isRetryable() const {
+ return retryableOptions != kNotRetryable;
+ }
+
+ std::string getRetryableFindAndModifyLocationStr() const {
+ switch (retryableOptions) {
+ case kNotRetryable:
+ return "Not retryable";
+ case kRecordInOplog:
+ return "Images in oplog";
+ case kRecordInSideCollection:
+ return "Images in side collection";
+ }
+ MONGO_UNREACHABLE;
+ }
+};
+
+class BatchedWriteOutputsTest : public OpObserverTest {
+public:
+ void setUp() override {
+ OpObserverTest::setUp();
+
+ auto opObserverRegistry = std::make_unique<OpObserverRegistry>();
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ getServiceContext()->setOpObserver(std::move(opObserverRegistry));
+ }
+
+protected:
+ // The maximum numbers of documents that can be deleted in a batch. Assumes _id of integer type.
+ static const int maxDocsInBatch = 203669;
+ const NamespaceString _nss{boost::none, "test", "coll"};
+ const NamespaceString _nssWithTid{TenantId(OID::gen()), "test", "coll"};
+ const UUID _uuid = UUID::gen();
+};
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestCannotGroupDDLOperation,
+ "Invariant failure.*getOpType.*repl::OpTypeEnum::kDelete.*kInsert.*kUpdate") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ bwc.addBatchedOperation(
+ opCtx,
+ repl::MutableOplogEntry::makeCreateCommand(
+ NamespaceString(boost::none, "other", "coll"), CollectionOptions(), BSON("v" << 2)));
+}
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestDoesNotSupportPreImagesInCollection,
+ "Invariant "
+ "failure.*getChangeStreamPreImageRecordingMode.*repl::ReplOperation::"
+ "ChangeStreamPreImageRecordingMode::kOff") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0));
+ entry.setChangeStreamPreImageRecordingMode(
+ repl::ReplOperation::ChangeStreamPreImageRecordingMode::kPreImagesCollection);
+ bwc.addBatchedOperation(opCtx, entry);
+}
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestDoesNotSupportPreImagesInOplog,
+ "Invariant "
+ "failure.*getChangeStreamPreImageRecordingMode.*repl::ReplOperation::"
+ "ChangeStreamPreImageRecordingMode::kOff") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0));
+ entry.setChangeStreamPreImageRecordingMode(
+ repl::ReplOperation::ChangeStreamPreImageRecordingMode::kOplog);
+ bwc.addBatchedOperation(opCtx, entry);
+}
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestDoesNotSupportMultiDocTxn,
+ "Invariant failure.*!opCtx->inMultiDocumentTransaction()") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ opCtx->setInMultiDocumentTransaction();
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0));
+ bwc.addBatchedOperation(opCtx, entry);
+}
+
+DEATH_TEST_REGEX_F(BatchedWriteOutputsTest,
+ TestDoesNotSupportRetryableWrites,
+ "Invariant failure.*!opCtx->getTxnNumber()") {
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ opCtx->setLogicalSessionId(LogicalSessionId(makeLogicalSessionIdForTest()));
+ opCtx->setTxnNumber(TxnNumber{1});
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ auto entry = repl::MutableOplogEntry::makeDeleteOperation(_nss, _uuid, BSON("_id" << 0));
+ bwc.addBatchedOperation(opCtx, entry);
+}
+
+// Verifies that a WriteUnitOfWork with groupOplogEntries=true replicates its writes as a single
+// applyOps. Tests WUOWs batching a range of 1 to 5 deletes (inclusive).
+TEST_F(BatchedWriteOutputsTest, TestApplyOpsGrouping) {
+ const auto nDocsToDelete = 5;
+ const BSONObj docsToDelete[nDocsToDelete] = {
+ BSON("_id" << 0),
+ BSON("_id" << 1),
+ BSON("_id" << 2),
+ BSON("_id" << 3),
+ BSON("_id" << 4),
+ };
+
+ // Setup.
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ reset(opCtx, NamespaceString::kRsOplogNamespace);
+
+ // Run the test with WUOW's grouping 1 to 5 deletions.
+ for (size_t docsToBeBatched = 1; docsToBeBatched <= nDocsToDelete; docsToBeBatched++) {
+
+ // Start a WUOW with groupOplogEntries=true. Verify that initialises the
+ // BatchedWriteContext.
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ ASSERT(!bwc.writesAreBatched());
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+ ASSERT(bwc.writesAreBatched());
+
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+
+ for (size_t doc = 0; doc < docsToBeBatched; doc++) {
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ repl::documentKeyDecoration(opCtx).emplace(docsToDelete[doc]["_id"].wrap(),
+ boost::none);
+ const OplogDeleteEntryArgs args;
+ opCtx->getServiceContext()->getOpObserver()->onDelete(
+ opCtx, _nss, _uuid, kUninitializedStmtId, args);
+ }
+
+ wuow.commit();
+
+ // Retrieve the oplog entries. We expect 'docsToBeBatched' oplog entries because of previous
+ // iteration of this loop that exercised previous batch sizes.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, docsToBeBatched);
+ // Entries in ascending timestamp order, so fetch the last one at the back of the vector.
+ auto lastOplogEntry = oplogs.back();
+ auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back()));
+
+ // The batch consists of an applyOps, whose array contains all deletes issued within the
+ // WUOW.
+ ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ std::vector<repl::OplogEntry> innerEntries;
+ repl::ApplyOps::extractOperationsTo(
+ lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries);
+ ASSERT_EQ(innerEntries.size(), docsToBeBatched);
+
+ for (size_t opIdx = 0; opIdx < docsToBeBatched; opIdx++) {
+ const auto innerEntry = innerEntries[opIdx];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kDelete);
+ ASSERT(innerEntry.getNss() == _nss);
+ ASSERT(0 == innerEntry.getObject().woCompare(docsToDelete[opIdx]));
+ }
+ }
+}
+
+// Verifies that a WriteUnitOfWork with groupOplogEntries=true constisting of an insert, an update
+// and a delete replicates as a single applyOps.
+TEST_F(BatchedWriteOutputsTest, TestApplyOpsInsertDeleteUpdate) {
+ // Setup.
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ reset(opCtx, NamespaceString::kRsOplogNamespace);
+
+ // Start a WUOW with groupOplogEntries=true. Verify that initialises the
+ // BatchedWriteContext.
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ ASSERT(!bwc.writesAreBatched());
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+ ASSERT(bwc.writesAreBatched());
+
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+
+ // (0) Insert
+ {
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(BSON("_id" << 0 << "data"
+ << "x"));
+ opCtx->getServiceContext()->getOpObserver()->onInserts(
+ opCtx, _nss, _uuid, insert.begin(), insert.end(), false);
+ }
+ // (1) Delete
+ {
+ repl::documentKeyDecoration(opCtx).emplace(BSON("_id" << 1), boost::none);
+ const OplogDeleteEntryArgs args;
+ opCtx->getServiceContext()->getOpObserver()->onDelete(
+ opCtx, _nss, _uuid, kUninitializedStmtId, args);
+ }
+ // (2) Update
+ {
+ CollectionUpdateArgs collUpdateArgs;
+ collUpdateArgs.update = BSON("fieldToUpdate"
+ << "valueToUpdate");
+ collUpdateArgs.criteria = BSON("_id" << 2);
+ auto args = OplogUpdateEntryArgs(&collUpdateArgs, _nss, _uuid);
+ opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, args);
+ }
+
+ // And commit the WUOW
+ wuow.commit();
+
+ // Retrieve the oplog entries. Implicitly asserts that there's one and only one oplog entry.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, 1);
+ auto lastOplogEntry = oplogs.back();
+ auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back()));
+
+ // The batch consists of an applyOps, whose array contains the three writes issued within the
+ // WUOW.
+ ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ std::vector<repl::OplogEntry> innerEntries;
+ repl::ApplyOps::extractOperationsTo(
+ lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries);
+ ASSERT_EQ(innerEntries.size(), 3);
+
+ {
+ const auto innerEntry = innerEntries[0];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT(innerEntry.getNss() == _nss);
+ ASSERT(0 ==
+ innerEntry.getObject().woCompare(BSON("_id" << 0 << "data"
+ << "x")));
+ }
+ {
+ const auto innerEntry = innerEntries[1];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kDelete);
+ ASSERT(innerEntry.getNss() == _nss);
+ ASSERT(0 == innerEntry.getObject().woCompare(BSON("_id" << 1)));
+ }
+ {
+ const auto innerEntry = innerEntries[2];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kUpdate);
+ ASSERT(innerEntry.getNss() == _nss);
+ ASSERT(0 ==
+ innerEntry.getObject().woCompare(BSON("fieldToUpdate"
+ << "valueToUpdate")));
+ }
+}
+
+// Repeat the same test as above, but assert tenantId is included when available
+TEST_F(BatchedWriteOutputsTest, TestApplyOpsInsertDeleteUpdateIncludesTenantId) {
+ gMultitenancySupport = true;
+ RAIIServerParameterControllerForTest featureFlagController("featureFlagRequireTenantID", true);
+ // Setup.
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ reset(opCtx, NamespaceString::kRsOplogNamespace);
+
+ // Start a WUOW with groupOplogEntries=true. Verify that initialises the
+ // BatchedWriteContext.
+ auto& bwc = BatchedWriteContext::get(opCtx);
+ ASSERT(!bwc.writesAreBatched());
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+ ASSERT(bwc.writesAreBatched());
+
+ AutoGetCollection locks(opCtx, _nssWithTid, LockMode::MODE_IX);
+
+ // (0) Insert
+ {
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(BSON("_id" << 0 << "data"
+ << "x"));
+ opCtx->getServiceContext()->getOpObserver()->onInserts(
+ opCtx, _nssWithTid, _uuid, insert.begin(), insert.end(), false);
+ }
+ // (1) Delete
+ {
+ repl::documentKeyDecoration(opCtx).emplace(BSON("_id" << 1), boost::none);
+ const OplogDeleteEntryArgs args;
+ opCtx->getServiceContext()->getOpObserver()->onDelete(
+ opCtx, _nssWithTid, _uuid, kUninitializedStmtId, args);
+ }
+ // (2) Update
+ {
+ CollectionUpdateArgs collUpdateArgs;
+ collUpdateArgs.update = BSON("fieldToUpdate"
+ << "valueToUpdate");
+ collUpdateArgs.criteria = BSON("_id" << 2);
+ auto args = OplogUpdateEntryArgs(&collUpdateArgs, _nssWithTid, _uuid);
+ opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, args);
+ }
+
+ // And commit the WUOW
+ wuow.commit();
+
+ // Retrieve the oplog entries. Implicitly asserts that there's one and only one oplog entry.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, 1);
+ auto lastOplogEntry = oplogs.back();
+ auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back()));
+
+ // The batch consists of an applyOps, whose array contains the three writes issued within the
+ // WUOW.
+ ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ std::vector<repl::OplogEntry> innerEntries;
+ repl::ApplyOps::extractOperationsTo(
+ lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries);
+ ASSERT_EQ(innerEntries.size(), 3);
+
+ {
+ const auto innerEntry = innerEntries[0];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ // TODO SERVER-67155 Check that (innerEntry.getNss() == _nssWithTid) and uncomment the
+ // 2 lines below once the OplogEntry deserializer passes "tid" to the NamespaceString
+ // constructor
+ ASSERT(innerEntry.getNss() == NamespaceString(boost::none, _nssWithTid.ns()));
+ // ASSERT(innerEntry.getNss().tenantId().has_value());
+ // ASSERT(*innerEntry.getNss().tenantId() == *_nssWithTid.tenantId());
+
+ ASSERT(innerEntry.getTid().has_value());
+ ASSERT(*innerEntry.getTid() == *_nssWithTid.tenantId());
+ ASSERT(0 ==
+ innerEntry.getObject().woCompare(BSON("_id" << 0 << "data"
+ << "x")));
+ }
+
+ {
+ const auto innerEntry = innerEntries[1];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kDelete);
+ // TODO SERVER-67155 Check that (innerEntry.getNss() == _nssWithTid) and uncomment the
+ // 2 lines below once the OplogEntry deserializer passes "tid" to the NamespaceString
+ // constructor
+ ASSERT(innerEntry.getNss() == NamespaceString(boost::none, _nssWithTid.ns()));
+ // ASSERT(innerEntry.getNss().tenantId().has_value());
+ // ASSERT(*innerEntry.getNss().tenantId() == *_nssWithTid.tenantId());
+
+ ASSERT(innerEntry.getTid().has_value());
+ ASSERT(*innerEntry.getTid() == *_nssWithTid.tenantId());
+ ASSERT(0 == innerEntry.getObject().woCompare(BSON("_id" << 1)));
+ }
+
+ {
+ const auto innerEntry = innerEntries[2];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kUpdate);
+ // TODO SERVER-67155 Check that (innerEntry.getNss() == _nssWithTid) and uncomment the
+ // 2 lines below once the OplogEntry deserializer passes "tid" to the NamespaceString
+ // constructor
+ ASSERT(innerEntry.getNss() == NamespaceString(boost::none, _nssWithTid.ns()));
+ // ASSERT(innerEntry.getNss().tenantId().has_value());
+ // ASSERT(*innerEntry.getNss().tenantId() == *_nssWithTid.tenantId());
+
+ ASSERT(innerEntry.getTid().has_value());
+ ASSERT(*innerEntry.getTid() == *_nssWithTid.tenantId());
+ ASSERT(0 ==
+ innerEntry.getObject().woCompare(BSON("fieldToUpdate"
+ << "valueToUpdate")));
+ }
+}
+
+// Verifies an empty WUOW doesn't generate an oplog entry.
+TEST_F(BatchedWriteOutputsTest, testEmptyWUOW) {
+ // Setup.
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ reset(opCtx, NamespaceString::kRsOplogNamespace);
+
+ // Start and commit an empty WUOW.
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+ wuow.commit();
+
+ // The getNOplogEntries call below asserts that the oplog is empty.
+ getNOplogEntries(opCtx, 0);
+}
+
+// Verifies a large WUOW that is within 16MB of oplog entry succeeds.
+TEST_F(BatchedWriteOutputsTest, testWUOWLarge) {
+ // Setup.
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ reset(opCtx, NamespaceString::kRsOplogNamespace);
+
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ // Delete BatchedWriteOutputsTest::maxDocsInBatch documents in a single batch, which is the
+ // maximum number of docs that can be batched while staying within 16MB of applyOps.
+ for (int docId = 0; docId < BatchedWriteOutputsTest::maxDocsInBatch; docId++) {
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ repl::documentKeyDecoration(opCtx).emplace(BSON("_id" << docId), boost::none);
+ const OplogDeleteEntryArgs args;
+ opCtx->getServiceContext()->getOpObserver()->onDelete(
+ opCtx, _nss, _uuid, kUninitializedStmtId, args);
+ }
+ wuow.commit();
+
+ // Retrieve the oplog entries, implicitly asserting that there's exactly one entry in the whole
+ // oplog.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, 1);
+ auto lastOplogEntry = oplogs.back();
+ auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back()));
+
+ // The batch consists of an applyOps, whose array contains all deletes issued within the
+ // WUOW.
+ ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ std::vector<repl::OplogEntry> innerEntries;
+ repl::ApplyOps::extractOperationsTo(
+ lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries);
+ ASSERT(innerEntries.size() == BatchedWriteOutputsTest::maxDocsInBatch);
+ for (int opIdx = 0; opIdx < BatchedWriteOutputsTest::maxDocsInBatch; opIdx++) {
+ BSONObj o = BSON("_id" << opIdx);
+ const auto innerEntry = innerEntries[opIdx];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kDelete);
+ ASSERT(innerEntry.getNss() == _nss);
+ ASSERT(0 == innerEntry.getObject().woCompare(o));
+ }
+}
+
+// Verifies a WUOW that would result in a an oplog entry >16MB fails with TransactionTooLarge.
+TEST_F(BatchedWriteOutputsTest, testWUOWTooLarge) {
+ // Setup.
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+ reset(opCtx, NamespaceString::kRsOplogNamespace);
+
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+
+ WriteUnitOfWork wuow(opCtx, true /* groupOplogEntries */);
+
+ // Attempt to delete more than BatchedWriteOutputsTest::maxDocsInBatch documents in a single
+ // batch, which fails as it can't generate an applyOps entry larger than 16MB.
+ for (int docId = 0; docId < BatchedWriteOutputsTest::maxDocsInBatch + 1; docId++) {
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ repl::documentKeyDecoration(opCtx).emplace(BSON("_id" << docId), boost::none);
+ const OplogDeleteEntryArgs args;
+ opCtx->getServiceContext()->getOpObserver()->onDelete(
+ opCtx, _nss, _uuid, kUninitializedStmtId, args);
+ }
+
+ ASSERT_THROWS_CODE(wuow.commit(), DBException, ErrorCodes::Error::TransactionTooLarge);
+
+ // The getNOplogEntries call below asserts that the oplog is empty.
+ getNOplogEntries(opCtx, 0);
+}
+
+class AtomicApplyOpsOutputsTest : public OpObserverTest {
+protected:
+ const NamespaceString _nss{boost::none, "test", "coll"};
+ const UUID _uuid = UUID::gen();
+};
+
+TEST_F(AtomicApplyOpsOutputsTest, InsertInNestedApplyOpsReturnsSuccess) {
+ auto opCtxRaii = cc().makeOperationContext();
+ auto opCtx = opCtxRaii.get();
+
+ reset(opCtx, _nss, _uuid);
+ resetOplogAndTransactions(opCtx);
+
+ auto opObserverRegistry = std::make_unique<OpObserverRegistry>();
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ opCtx->getServiceContext()->setOpObserver(std::move(opObserverRegistry));
+
+ auto mode = repl::OplogApplication::Mode::kApplyOpsCmd;
+ // Make sure the apply ops command object contains the correct UUID information.
+ CollectionOptions options;
+ options.uuid = _uuid;
+ BSONObjBuilder resultBuilder;
+
+ auto innerCmdObj = BSON("op"
+ << "i"
+ << "ns" << _nss.ns() << "o"
+ << BSON("_id"
+ << "a")
+ << "ui" << options.uuid.get());
+ auto innerApplyOpsObj = BSON("op"
+ << "c"
+ << "ns" << _nss.getCommandNS().ns() << "o"
+ << BSON("applyOps" << BSON_ARRAY(innerCmdObj)));
+ auto cmdObj = BSON("applyOps" << BSON_ARRAY(innerApplyOpsObj));
+
+ ASSERT_OK(repl::applyOps(opCtx, _nss.db().toString(), cmdObj, mode, &resultBuilder));
+
+ // Retrieve the oplog entries, implicitly asserting that there's exactly one entry in the whole
+ // oplog.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, 1);
+ auto lastOplogEntry = oplogs.back();
+ auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back()));
+
+ // The oplog entry is an applyOps containing the insert.
+ ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ std::vector<repl::OplogEntry> innerEntries;
+ repl::ApplyOps::extractOperationsTo(
+ lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries);
+ ASSERT(innerEntries.size() == 1);
+ const auto innerEntry = innerEntries[0];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT(innerEntry.getNss() == _nss);
+ ASSERT(0 ==
+ innerEntry.getObject().woCompare(BSON("_id"
+ << "a")));
+}
+
+TEST_F(AtomicApplyOpsOutputsTest, AtomicApplyOpsWithNoOpsReturnsSuccess) {
+ auto opCtxRaii = cc().makeOperationContext();
+ auto opCtx = opCtxRaii.get();
+ reset(opCtx, _nss, _uuid);
+ resetOplogAndTransactions(opCtx);
+
+ auto opObserverRegistry = std::make_unique<OpObserverRegistry>();
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ opCtx->getServiceContext()->setOpObserver(std::move(opObserverRegistry));
+
+ auto mode = repl::OplogApplication::Mode::kApplyOpsCmd;
+ BSONObjBuilder resultBuilder;
+ auto cmdObj = BSON("applyOps" << BSONArray());
+ ASSERT_OK(repl::applyOps(opCtx, _nss.db().toString(), cmdObj, mode, &resultBuilder));
+
+ // Retrieve the oplog entries, implicitly asserting that there's exactly no entry in the whole
+ // oplog.
+ getNOplogEntries(opCtx, 0);
+}
+
+TEST_F(AtomicApplyOpsOutputsTest, AtomicApplyOpsInsertWithUuidIntoCollectionWithUuid) {
+ auto opCtxRaii = cc().makeOperationContext();
+ auto opCtx = opCtxRaii.get();
+ reset(opCtx, _nss, _uuid);
+ resetOplogAndTransactions(opCtx);
+
+ auto opObserverRegistry = std::make_unique<OpObserverRegistry>();
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ opCtx->getServiceContext()->setOpObserver(std::move(opObserverRegistry));
+
+ auto mode = repl::OplogApplication::Mode::kApplyOpsCmd;
+
+ auto const insertOp = BSON("op"
+ << "i"
+ << "ns" << _nss.ns() << "o" << BSON("_id" << 0) << "ui" << _uuid);
+ auto const cmdObj = BSON("applyOps" << BSON_ARRAY(insertOp));
+
+ BSONObjBuilder resultBuilder;
+ ASSERT_OK(repl::applyOps(opCtx, _nss.db().toString(), cmdObj, mode, &resultBuilder));
+
+ // Retrieve the oplog entries, implicitly asserting that there's exactly one entry in the whole
+ // oplog.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, 1);
+ auto lastOplogEntry = oplogs.back();
+ auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back()));
+
+ // The oplog entry is an applyOps containing the insert.
+ ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ std::vector<repl::OplogEntry> innerEntries;
+ repl::ApplyOps::extractOperationsTo(
+ lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries);
+ ASSERT(innerEntries.size() == 1);
+ const auto innerEntry = innerEntries[0];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT(innerEntry.getNss() == _nss);
+ ASSERT(0 == innerEntry.getObject().woCompare(BSON("_id" << 0)));
+}
+
+TEST_F(AtomicApplyOpsOutputsTest, AtomicApplyOpsInsertWithoutUuidIntoCollectionWithUuid) {
+ auto opCtxRaii = cc().makeOperationContext();
+ auto opCtx = opCtxRaii.get();
+ reset(opCtx, _nss, _uuid);
+ resetOplogAndTransactions(opCtx);
+
+ auto opObserverRegistry = std::make_unique<OpObserverRegistry>();
+ opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ opCtx->getServiceContext()->setOpObserver(std::move(opObserverRegistry));
+
+ auto mode = repl::OplogApplication::Mode::kApplyOpsCmd;
+
+ auto const insertOp = BSON("op"
+ << "i"
+ << "ns" << _nss.ns() << "o" << BSON("_id" << 0) /* no UUID */);
+ auto const cmdObj = BSON("applyOps" << BSON_ARRAY(insertOp));
+
+ BSONObjBuilder resultBuilder;
+ ASSERT_OK(repl::applyOps(opCtx, _nss.db().toString(), cmdObj, mode, &resultBuilder));
+
+ // Retrieve the oplog entries, implicitly asserting that there's exactly one entry in the whole
+ // oplog.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, 1);
+ auto lastOplogEntry = oplogs.back();
+ auto lastOplogEntryParsed = assertGet(OplogEntry::parse(oplogs.back()));
+
+ // The oplog entry is an applyOps containing the insert.
+ ASSERT(lastOplogEntryParsed.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ std::vector<repl::OplogEntry> innerEntries;
+ repl::ApplyOps::extractOperationsTo(
+ lastOplogEntryParsed, lastOplogEntryParsed.getEntry().toBSON(), &innerEntries);
+ ASSERT(innerEntries.size() == 1);
+ const auto innerEntry = innerEntries[0];
+ ASSERT(innerEntry.getCommandType() == OplogEntry::CommandType::kNotCommand);
+ ASSERT(innerEntry.getOpType() == repl::OpTypeEnum::kInsert);
+ ASSERT(innerEntry.getNss() == _nss);
+ ASSERT(0 == innerEntry.getObject().woCompare(BSON("_id" << 0)));
+}
+
+class OnDeleteOutputsTest : public OpObserverTest {
+
+protected:
+ void logTestCase(const DeleteTestCase& testCase) {
+ LOGV2(5739905,
+ "DeleteTestCase",
+ "PreImageRecording"_attr = testCase.alwaysRecordPreImages,
+ "ChangeStreamPreAndPostImagesEnabled"_attr = testCase.changeStreamImagesEnabled,
+ "RetryableFindAndModifyLocation"_attr =
+ testCase.getRetryableFindAndModifyLocationStr(),
+ "ExpectedOplogEntries"_attr = testCase.numOutputOplogs);
+ }
+
+ void initializeOplogDeleteEntryArgs(OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ OplogDeleteEntryArgs* deleteArgs) {
+ deleteArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
+ deleteArgs->changeStreamPreAndPostImagesEnabledForCollection =
+ testCase.changeStreamImagesEnabled;
+
+ switch (testCase.retryableOptions) {
+ case kNotRetryable:
+ deleteArgs->retryableFindAndModifyLocation = kNotRetryable;
+ break;
+ case kRecordInOplog:
+ deleteArgs->retryableFindAndModifyLocation = kRecordInOplog;
+ break;
+ case kRecordInSideCollection:
+ deleteArgs->retryableFindAndModifyLocation = kRecordInSideCollection;
+ break;
+ }
+ if (testCase.isRetryable() || testCase.alwaysRecordPreImages ||
+ testCase.changeStreamImagesEnabled) {
+ deleteArgs->deletedDoc = &_deletedDoc;
+ }
+ }
+
+ void checkPreImageInOplogIfNeeded(
+ const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& deleteOplogEntry,
+ const boost::optional<OplogEntry> applyOpsOplogEntry = boost::none) {
+ const bool checkPreImageInOplog = deleteArgs.preImageRecordingEnabledForCollection ||
+ deleteArgs.retryableFindAndModifyLocation == kRecordInOplog;
+ if (checkPreImageInOplog) {
+ ASSERT(deleteOplogEntry.getPreImageOpTime());
+ if (applyOpsOplogEntry) {
+ ASSERT_FALSE(applyOpsOplogEntry->getPreImageOpTime());
+ }
+
+ const Timestamp preImageOpTime = deleteOplogEntry.getPreImageOpTime()->getTimestamp();
+ ASSERT_FALSE(preImageOpTime.isNull());
+ OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime);
+ ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getObject());
+ if (deleteOplogEntry.getSessionId()) {
+ ASSERT_EQ(*deleteOplogEntry.getSessionId(), *preImage.getSessionId());
+ }
+ if (deleteOplogEntry.getTxnNumber()) {
+ ASSERT_EQ(*deleteOplogEntry.getTxnNumber(), *preImage.getTxnNumber());
+ }
+ if (!deleteOplogEntry.getStatementIds().empty()) {
+ const auto& deleteOplogStmtIds = deleteOplogEntry.getStatementIds();
+ const auto& preImageOplogStmtIds = preImage.getStatementIds();
+ ASSERT_EQ(deleteOplogStmtIds.size(), preImageOplogStmtIds.size());
+ for (size_t i = 0; i < deleteOplogStmtIds.size(); i++) {
+ ASSERT_EQ(deleteOplogStmtIds[i], preImageOplogStmtIds[i]);
+ }
+ }
+ } else {
+ ASSERT_FALSE(deleteOplogEntry.getPreImageOpTime());
+ }
+ }
+
+ void checkSideCollectionIfNeeded(
+ OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& deleteOplogEntry,
+ const boost::optional<OplogEntry> applyOpsOplogEntry = boost::none) {
+ bool didWriteInSideCollection =
+ deleteArgs.retryableFindAndModifyLocation == kRecordInSideCollection &&
+ !deleteArgs.preImageRecordingEnabledForCollection;
+ if (didWriteInSideCollection) {
+ repl::ImageEntry imageEntry =
+ getImageEntryFromSideCollection(opCtx, *deleteOplogEntry.getSessionId());
+ ASSERT(imageEntry.getImageKind() == deleteOplogEntry.getNeedsRetryImage());
+ if (applyOpsOplogEntry) {
+ ASSERT_FALSE(applyOpsOplogEntry->getNeedsRetryImage());
+ }
+ ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage);
+ ASSERT_BSONOBJ_EQ(_deletedDoc, imageEntry.getImage());
+
+ // If 'deleteOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged
+ // noop oplog entry for the preImage written to the side collection.
+ const Timestamp forgeNoopTimestamp = deleteOplogEntry.getTimestamp() - 1;
+ ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp));
+ } else {
+ ASSERT_FALSE(deleteOplogEntry.getNeedsRetryImage());
+ if (deleteOplogEntry.getSessionId()) {
+ ASSERT_FALSE(
+ didWriteImageEntryToSideCollection(opCtx, *deleteOplogEntry.getSessionId()));
+ } else {
+ // Session id is missing only for non-retryable option.
+ ASSERT(testCase.retryableOptions == kNotRetryable);
+ }
+ }
+ }
+
+ void checkChangeStreamImagesIfNeeded(OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const OplogEntry& deleteOplogEntry) {
+ const Timestamp preImageOpTime = deleteOplogEntry.getOpTime().getTimestamp();
+ ChangeStreamPreImageId preImageId(_uuid, preImageOpTime, 0);
+ if (deleteArgs.changeStreamPreAndPostImagesEnabledForCollection) {
+ BSONObj container;
+ ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container);
+ ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getPreImage());
+ ASSERT_EQ(deleteOplogEntry.getWallClockTime(), preImage.getOperationTime());
+ } else {
+ ASSERT_FALSE(didWriteDeletedDocToPreImagesCollection(opCtx, preImageId));
+ }
+ }
+
+ std::vector<DeleteTestCase> _cases{
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
+
+ const NamespaceString _nss{boost::none, "test", "coll"};
+ const UUID _uuid = UUID::gen();
+ const BSONObj _deletedDoc = BSON("_id" << 0 << "valuePriorToDelete"
+ << "marvelous");
+};
+
+TEST_F(OnDeleteOutputsTest, TestNonTransactionFundamentalOnDeleteOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the delete call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ OplogDeleteEntryArgs deleteEntryArgs;
+ initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ repl::documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none);
+ opObserver.onDelete(
+ opCtx, _nss, _uuid, testCase.isRetryable() ? 1 : kUninitializedStmtId, deleteEntryArgs);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto deleteOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ checkChangeStreamImagesIfNeeded(opCtx, testCase, deleteEntryArgs, deleteOplogEntry);
+ }
+}
+
+TEST_F(OnDeleteOutputsTest, TestTransactionFundamentalOnDeleteOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the delete call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ } else {
+ beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ OplogDeleteEntryArgs deleteEntryArgs;
+ initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs);
+ const auto stmtId = testCase.isRetryable() ? 1 : kUninitializedStmtId;
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ repl::documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none);
+ opObserver.onDelete(opCtx, _nss, _uuid, stmtId, deleteEntryArgs);
+ commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ auto deleteOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry);
+ checkPreImageInOplogIfNeeded(
+ testCase, deleteEntryArgs, oplogs, deleteOplogEntry, applyOpsOplogEntry);
+ checkSideCollectionIfNeeded(
+ opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry, applyOpsOplogEntry);
+ checkChangeStreamImagesIfNeeded(opCtx, testCase, deleteEntryArgs, deleteOplogEntry);
+ }
+}
+
+TEST_F(OnDeleteOutputsTest,
+ RetryableInternalTransactionDeleteWithPreImageRecordingEnabledOnShardServerThrows) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ beginRetryableInternalTransactionWithTxnNumber(opCtx, 0, contextSession);
+
+ OplogDeleteEntryArgs deleteEntryArgs;
+ deleteEntryArgs.preImageRecordingEnabledForCollection = true;
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ ON_BLOCK_EXIT([] { serverGlobalParams.clusterRole = ClusterRole::None; });
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ repl::documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none);
+ ASSERT_THROWS_CODE(opObserver.onDelete(opCtx, _nss, _uuid, 1 /* stmtId */, deleteEntryArgs),
+ DBException,
+ 6462401);
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0, BSON("_id" << 0));
+ inserts1.emplace_back(1, BSON("_id" << 1));
+ std::vector<InsertStatement> inserts2;
+ inserts2.emplace_back(0, BSON("_id" << 2));
+ inserts2.emplace_back(1, BSON("_id" << 3));
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+ opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 0) << "o2" << BSON("_id" << 0)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1) << "o2" << BSON("_id" << 1)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 2) << "o2" << BSON("_id" << 2)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
+
+ // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn'
+ // field.
+ oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 3) << "o2" << BSON("_id" << 3)))
+ << "count" << 4);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[3].getObject());
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "update");
+
+ CollectionUpdateArgs updateArgs1;
+ updateArgs1.stmtIds = {0};
+ updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ updateArgs1.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs1.criteria = BSON("_id" << 0);
+ OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+
+ CollectionUpdateArgs updateArgs2;
+ updateArgs2.stmtIds = {1};
+ updateArgs2.updatedDoc = BSON("_id" << 1 << "data"
+ << "y");
+ updateArgs2.update = BSON("$set" << BSON("data"
+ << "y"));
+ updateArgs2.criteria = BSON("_id" << 1);
+ OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onUpdate(opCtx(), update1);
+ opObserver().onUpdate(opCtx(), update2);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("$set" << BSON("data"
+ << "x"))
+ << "o2" << BSON("_id" << 0)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn'
+ // field.
+ oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("$set" << BSON("data"
+ << "y"))
+ << "o2" << BSON("_id" << 1)))
+ << "count" << 2);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionPreImageTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ auto uuid1 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "txntest");
+
+ CollectionUpdateArgs updateArgs1;
+ const auto updateSpec = BSON("$set" << BSON("data"
+ << "x"));
+ const auto updatePreImage = BSON("_id" << 0 << "data"
+ << "y");
+ const auto updatePostImage = BSON("_id" << 0 << "data"
+ << "x");
+ const auto updateFilter = BSON("_id" << 0);
+
+ updateArgs1.stmtIds = {0};
+ updateArgs1.updatedDoc = updatePostImage;
+ updateArgs1.update = updateSpec;
+ updateArgs1.preImageDoc = updatePreImage;
+ updateArgs1.preImageRecordingEnabledForCollection = true;
+ updateArgs1.criteria = updateFilter;
+ OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ opObserver().onUpdate(opCtx(), update1);
+
+ const auto deletedDoc = BSON("_id" << 1 << "data"
+ << "z");
+ OplogDeleteEntryArgs args;
+ args.deletedDoc = &deletedDoc;
+ args.preImageRecordingEnabledForCollection = true;
+ opObserver().aboutToDelete(opCtx(), nss1, uuid1, deletedDoc);
+ opObserver().onDelete(opCtx(), nss1, uuid1, 0, args);
+
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 2);
+
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) {
+ continue;
+ }
+ checkSessionAndTransactionFields(oplogEntryObj);
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+
+ ASSERT(oplogEntries[0].getOpType() == repl::OpTypeEnum::kNoop);
+ ASSERT_BSONOBJ_EQ(updatePreImage, oplogEntries[0].getObject());
+ ASSERT(oplogEntries[1].getOpType() == repl::OpTypeEnum::kNoop);
+ ASSERT_BSONOBJ_EQ(deletedDoc, oplogEntries[1].getObject());
+ ASSERT_BSONOBJ_EQ(BSON("applyOps"
+ << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << updateSpec << "o2" << BSON("_id" << 0)
+ << "preImageOpTime" << oplogEntries[0].getOpTime()))
+ << "partialTxn" << true),
+ oplogEntries[2].getObject());
+ ASSERT_BSONOBJ_EQ(BSON("applyOps"
+ << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1) << "preImageOpTime"
+ << oplogEntries[1].getOpTime()))
+ << "count" << 2),
+ oplogEntries[3].getObject());
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPreImageTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ auto uuid1 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "txntest");
+
+ CollectionUpdateArgs updateArgs1;
+ const auto updateSpec = BSON("$set" << BSON("data"
+ << "x"));
+ const auto updatePreImage = BSON("_id" << 0 << "data"
+ << "y");
+ const auto updatePostImage = BSON("_id" << 0 << "data"
+ << "x");
+ const auto updateFilter = BSON("_id" << 0);
+
+ updateArgs1.stmtIds = {0};
+ updateArgs1.updatedDoc = updatePostImage;
+ updateArgs1.update = updateSpec;
+ updateArgs1.preImageDoc = updatePreImage;
+ updateArgs1.preImageRecordingEnabledForCollection = true;
+ updateArgs1.criteria = updateFilter;
+ OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ opObserver().onUpdate(opCtx(), update1);
+
+ const auto deletedDoc = BSON("_id" << 1 << "data"
+ << "z");
+ OplogDeleteEntryArgs args;
+ args.deletedDoc = &deletedDoc;
+ args.preImageRecordingEnabledForCollection = true;
+ opObserver().aboutToDelete(opCtx(), nss1, uuid1, deletedDoc);
+ opObserver().onDelete(opCtx(), nss1, uuid1, 0, args);
+
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 4);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime, 2);
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) {
+ continue;
+ }
+ checkSessionAndTransactionFields(oplogEntryObj);
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+
+ ASSERT(oplogEntries[0].getOpType() == repl::OpTypeEnum::kNoop);
+ ASSERT_BSONOBJ_EQ(updatePreImage, oplogEntries[0].getObject());
+ ASSERT(oplogEntries[1].getOpType() == repl::OpTypeEnum::kNoop);
+ ASSERT_BSONOBJ_EQ(deletedDoc, oplogEntries[1].getObject());
+ ASSERT_BSONOBJ_EQ(BSON("applyOps"
+ << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << updateSpec << "o2" << BSON("_id" << 0)
+ << "preImageOpTime" << oplogEntries[0].getOpTime()))
+ << "partialTxn" << true),
+ oplogEntries[2].getObject());
+ ASSERT_BSONOBJ_EQ(BSON("applyOps"
+ << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1) << "preImageOpTime"
+ << oplogEntries[1].getOpTime()))
+ << "prepare" << true << "count" << 2),
+ oplogEntries[3].getObject());
+
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "delete");
+
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().aboutToDelete(opCtx(),
+ nss1,
+ uuid1,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ opObserver().onDelete(opCtx(), nss1, uuid1, 0, {});
+ opObserver().aboutToDelete(opCtx(),
+ nss2,
+ uuid2,
+ BSON("_id" << 1 << "data"
+ << "y"));
+ opObserver().onDelete(opCtx(), nss2, uuid2, 0, {});
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns" << nss1.toString() << "ui" << uuid1
+ << "o" << BSON("_id" << 0)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ // This should be the implicit commit oplog entry, indicated by the absence of the 'partialTxn'
+ // field.
+ oExpected = oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns" << nss2.toString() << "ui"
+ << uuid2 << "o" << BSON("_id" << 1)))
+ << "count" << 2);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0, BSON("_id" << 0));
+ inserts1.emplace_back(1, BSON("_id" << 1));
+ std::vector<InsertStatement> inserts2;
+ inserts2.emplace_back(0, BSON("_id" << 2));
+ inserts2.emplace_back(1, BSON("_id" << 3));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+ opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
+
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 4);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime);
+
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 0) << "o2" << BSON("_id" << 0)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1) << "o2" << BSON("_id" << 1)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 2) << "o2" << BSON("_id" << 2)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[2].getObject());
+
+ oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 3) << "o2" << BSON("_id" << 3)))
+ << "prepare" << true << "count" << 4);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[3].getObject());
+
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "update");
+
+ CollectionUpdateArgs updateArgs1;
+ updateArgs1.stmtIds = {0};
+ updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ updateArgs1.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs1.criteria = BSON("_id" << 0);
+ OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+
+ CollectionUpdateArgs updateArgs2;
+ updateArgs2.stmtIds = {1};
+ updateArgs2.updatedDoc = BSON("_id" << 1 << "data"
+ << "y");
+ updateArgs2.update = BSON("$set" << BSON("data"
+ << "y"));
+ updateArgs2.criteria = BSON("_id" << 1);
+ OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onUpdate(opCtx(), update1);
+ opObserver().onUpdate(opCtx(), update2);
+
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 2);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime);
+
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("$set" << BSON("data"
+ << "x"))
+ << "o2" << BSON("_id" << 0)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "u"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("$set" << BSON("data"
+ << "y"))
+ << "o2" << BSON("_id" << 1)))
+ << "prepare" << true << "count" << 2);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "delete");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().aboutToDelete(opCtx(),
+ nss1,
+ uuid1,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ opObserver().onDelete(opCtx(), nss1, uuid1, 0, {});
+ opObserver().aboutToDelete(opCtx(),
+ nss2,
+ uuid2,
+ BSON("_id" << 1 << "data"
+ << "y"));
+ opObserver().onDelete(opCtx(), nss2, uuid2, 0, {});
+
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 2);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime);
+
+ ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns" << nss1.toString() << "ui" << uuid1
+ << "o" << BSON("_id" << 0)))
+ << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "d"
+ << "ns" << nss2.toString() << "ui" << uuid2
+ << "o" << BSON("_id" << 1)))
+ << "prepare" << true << "count" << 2);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ auto uuid1 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ inserts1.emplace_back(1,
+ BSON("_id" << 1 << "data"
+ << "y"));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 2);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime);
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+
+ const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps);
+
+ // This should be the implicit prepare entry.
+ const auto prepareEntry = assertGet(OplogEntry::parse(oplogEntryObjs[1]));
+ ASSERT_TRUE(prepareEntry.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(prepareEntry.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ ASSERT_EQ(prepareEntry.getObject()["prepare"].boolean(), true);
+
+ const auto startOpTime = insertEntry.getOpTime();
+
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+ const auto prepareTimestamp = prepareOpTime.getTimestamp();
+ ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp());
+
+ // Reserve oplog entry for the commit oplog entry.
+ OplogSlot commitSlot = reserveOpTimeInSideTransaction(opCtx());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ txnParticipant.stashTransactionResources(opCtx());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecordStartOpTime(startOpTime);
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+
+ // Mimic committing the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+
+ // commitTimestamp must be greater than the prepareTimestamp.
+ auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
+
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ commitTimestamp,
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ }
+ oplogEntryObjs = getNOplogEntries(opCtx(), 3);
+ const auto commitOplogObj = oplogEntryObjs.back();
+ checkSessionAndTransactionFields(commitOplogObj);
+ auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj));
+ auto o = commitEntry.getObject();
+ auto oExpected = BSON("commitTransaction" << 1 << "commitTimestamp" << commitTimestamp);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_TRUE(commitEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(*commitEntry.getPrevWriteOpTimeInTransaction(), prepareEntry.getOpTime());
+
+ assertTxnRecord(txnNum(), commitSlot, DurableTxnStateEnum::kCommitted);
+ // startTimestamp should no longer be set once the transaction has been committed.
+ assertTxnRecordStartOpTime(boost::none);
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ auto uuid1 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 1);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime);
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
+
+ const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ const auto startOpTime = insertEntry.getOpTime();
+
+ const auto prepareTimestamp = prepareOpTime.getTimestamp();
+
+ const auto prepareEntry = insertEntry;
+ ASSERT_EQ(prepareEntry.getObject()["prepare"].boolean(), true);
+
+ // Reserve oplog entry for the abort oplog entry.
+ OplogSlot abortSlot = reserveOpTimeInSideTransaction(opCtx());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecordStartOpTime(startOpTime);
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+ ASSERT_EQ(prepareTimestamp, opCtx()->recoveryUnit()->getPrepareTimestamp());
+
+ // Mimic aborting the transaction by resetting the WUOW.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_IX);
+ opObserver().onTransactionAbort(opCtx(), abortSlot);
+ }
+ txnParticipant.transitionToAbortedWithPrepareforTest(opCtx());
+
+ txnParticipant.stashTransactionResources(opCtx());
+ oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ auto abortOplogObj = oplogEntryObjs.back();
+ checkSessionAndTransactionFields(abortOplogObj);
+ auto abortEntry = assertGet(OplogEntry::parse(abortOplogObj));
+ auto o = abortEntry.getObject();
+ auto oExpected = BSON("abortTransaction" << 1);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_TRUE(abortEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(*abortEntry.getPrevWriteOpTimeInTransaction(), prepareEntry.getOpTime());
+
+ assertTxnRecord(txnNum(), abortSlot, DurableTxnStateEnum::kAborted);
+ // startOpTime should no longer be set once a transaction has been aborted.
+ assertTxnRecordStartOpTime(boost::none);
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) {
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max();
+
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0, BSON("_id" << 0));
+ inserts1.emplace_back(1, BSON("_id" << 1));
+ std::vector<InsertStatement> inserts2;
+ inserts2.emplace_back(0, BSON("_id" << 2));
+ inserts2.emplace_back(1, BSON("_id" << 3));
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+ opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 0) << "o2" << BSON("_id" << 0))
+ << BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1) << "o2" << BSON("_id" << 1))
+ << BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 2) << "o2" << BSON("_id" << 2))
+ << BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 3) << "o2" << BSON("_id" << 3))));
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPackingTest) {
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max();
+
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ const NamespaceString nss2(boost::none, "testDB2", "testColl2");
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0, BSON("_id" << 0));
+ inserts1.emplace_back(1, BSON("_id" << 1));
+ std::vector<InsertStatement> inserts2;
+ inserts2.emplace_back(0, BSON("_id" << 2));
+ inserts2.emplace_back(1, BSON("_id" << 3));
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+ opObserver().onInserts(opCtx(), nss2, uuid2, inserts2.begin(), inserts2.end(), false);
+
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 4);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime);
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObj = getSingleOplogEntry(opCtx());
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+
+ auto oExpected =
+ BSON("applyOps" << BSON_ARRAY(BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 0) << "o2" << BSON("_id" << 0))
+ << BSON("op"
+ << "i"
+ << "ns" << nss1.toString() << "ui" << uuid1 << "o"
+ << BSON("_id" << 1) << "o2" << BSON("_id" << 1))
+ << BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 2) << "o2" << BSON("_id" << 2))
+ << BSON("op"
+ << "i"
+ << "ns" << nss2.toString() << "ui" << uuid2 << "o"
+ << BSON("_id" << 3) << "o2" << BSON("_id" << 3)))
+ << "prepare" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction");
+}
+
+TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedPackingTest) {
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry = std::numeric_limits<int>::max();
+ const NamespaceString nss1(boost::none, "testDB", "testColl");
+ auto uuid1 = UUID::gen();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+
+ std::vector<InsertStatement> inserts1;
+ inserts1.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+ inserts1.emplace_back(1,
+ BSON("_id" << 1 << "data"
+ << "y"));
+
+ opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
+
+ auto reservedSlots = reserveOpTimesInSideTransaction(opCtx(), 2);
+ auto prepareOpTime = reservedSlots.back();
+ txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
+ prepareTransaction(reservedSlots, prepareOpTime);
+
+ txnParticipant.stashTransactionResources(opCtx());
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
+
+ // This should be the implicit prepare oplog entry.
+ const auto insertEntry = assertGet(OplogEntry::parse(oplogEntryObjs[0]));
+ ASSERT_TRUE(insertEntry.getOpType() == repl::OpTypeEnum::kCommand);
+ ASSERT_TRUE(insertEntry.getCommandType() == OplogEntry::CommandType::kApplyOps);
+ ASSERT_EQ(insertEntry.getObject()["prepare"].boolean(), true);
+
+ // If we are only going to write a single prepare oplog entry, but we have reserved multiple
+ // oplog slots, at T=1 and T=2, for example, then the 'prepare' oplog entry should be written at
+ // T=2 i.e. the last reserved slot. In this case, the 'startOpTime' of the transaction should
+ // also be set to T=2, not T=1. We verify that below.
+ const auto startOpTime = prepareOpTime;
+
+ const auto prepareTimestamp = prepareOpTime.getTimestamp();
+
+ // Reserve oplog entry for the commit oplog entry.
+ OplogSlot commitSlot = reserveOpTimeInSideTransaction(opCtx());
+
+ ASSERT_EQ(prepareOpTime, txnParticipant.getLastWriteOpTime());
+ assertTxnRecord(txnNum(), prepareOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecordStartOpTime(startOpTime);
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+
+ // Mimic committing the transaction.
+ opCtx()->setWriteUnitOfWork(nullptr);
+ opCtx()->lockState()->unsetMaxLockTimeout();
+
+ // commitTimestamp must be greater than the prepareTimestamp.
+ auto commitTimestamp = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
+
+ opObserver().onPreparedTransactionCommit(
+ opCtx(),
+ commitSlot,
+ commitTimestamp,
+ txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+
+ oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ const auto commitOplogObj = oplogEntryObjs.back();
+ checkSessionAndTransactionFields(commitOplogObj);
+ auto commitEntry = assertGet(OplogEntry::parse(commitOplogObj));
+ auto o = commitEntry.getObject();
+ auto oExpected = BSON("commitTransaction" << 1 << "commitTimestamp" << commitTimestamp);
+ ASSERT_BSONOBJ_EQ(oExpected, o);
+ ASSERT_TRUE(commitEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(*commitEntry.getPrevWriteOpTimeInTransaction(), insertEntry.getOpTime());
+
+ assertTxnRecord(txnNum(), commitSlot, DurableTxnStateEnum::kCommitted);
+ // startTimestamp should no longer be set once the transaction has been committed.
+ assertTxnRecordStartOpTime(boost::none);
+}
+
+/**
+ * Test fixture with sessions and an extra-large oplog for testing large transactions.
+ */
+class OpObserverLargeTransactionTest : public OpObserverTransactionTest {
+private:
+ repl::ReplSettings createReplSettings() override {
+ repl::ReplSettings settings;
+ // We need an oplog comfortably large enough to hold an oplog entry that exceeds the BSON
+ // size limit. Otherwise we will get the wrong error code when trying to write one.
+ settings.setOplogSizeBytes(BSONObjMaxInternalSize + 2 * 1024 * 1024);
+ settings.setReplSetString("mySet/node1:12345");
+ return settings;
+ }
+};
+
+// Tests that a large transaction may be committed. This test creates a transaction with two
+// operations that together are just big enough to exceed the size limit, which should result in a
+// two oplog entry transaction.
+TEST_F(OpObserverLargeTransactionTest, LargeTransactionCreatesMultipleOplogEntries) {
+ const NamespaceString nss(boost::none, "testDB", "testColl");
+ auto uuid = UUID::gen();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ // This size is crafted such that two operations of this size are not too big to fit in a single
+ // oplog entry, but two operations plus oplog overhead are too big to fit in a single oplog
+ // entry.
+ constexpr size_t kHalfTransactionSize = BSONObjMaxInternalSize / 2 - 175;
+ std::unique_ptr<uint8_t[]> halfTransactionData(new uint8_t[kHalfTransactionSize]());
+ auto operation1 = repl::DurableOplogEntry::makeInsertOperation(
+ nss,
+ uuid,
+ BSON("_id" << 0 << "data"
+ << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral)),
+ BSON("_id" << 0));
+ auto operation2 = repl::DurableOplogEntry::makeInsertOperation(
+ nss,
+ uuid,
+ BSON("_id" << 0 << "data"
+ << BSONBinData(halfTransactionData.get(), kHalfTransactionSize, BinDataGeneral)),
+ BSON("_id" << 0));
+ txnParticipant.addTransactionOperation(opCtx(), operation1);
+ txnParticipant.addTransactionOperation(opCtx(), operation2);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0);
+ auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
+ std::vector<OplogEntry> oplogEntries;
+ mongo::repl::OpTime expectedPrevWriteOpTime;
+ for (const auto& oplogEntryObj : oplogEntryObjs) {
+ checkSessionAndTransactionFields(oplogEntryObj);
+ oplogEntries.push_back(assertGet(OplogEntry::parse(oplogEntryObj)));
+ const auto& oplogEntry = oplogEntries.back();
+ ASSERT(!oplogEntry.shouldPrepare());
+ ASSERT_TRUE(oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_EQ(expectedPrevWriteOpTime, *oplogEntry.getPrevWriteOpTimeInTransaction());
+ ASSERT_LT(expectedPrevWriteOpTime.getTimestamp(), oplogEntry.getTimestamp());
+ expectedPrevWriteOpTime = repl::OpTime{oplogEntry.getTimestamp(), *oplogEntry.getTerm()};
+ }
+
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(operation1.toBSON()) << "partialTxn" << true);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[0].getObject());
+
+ oExpected = BSON("applyOps" << BSON_ARRAY(operation2.toBSON()) << "count" << 2);
+ ASSERT_BSONOBJ_EQ(oExpected, oplogEntries[1].getObject());
+}
+
+TEST_F(OpObserverTest, OnRollbackInvalidatesDefaultRWConcernCache) {
+ auto& rwcDefaults = ReadWriteConcernDefaults::get(getServiceContext());
+ auto opCtx = getClient()->makeOperationContext();
+
+ // Put initial defaults in the cache.
+ {
+ RWConcernDefault origDefaults;
+ origDefaults.setUpdateOpTime(Timestamp(10, 20));
+ origDefaults.setUpdateWallClockTime(Date_t::fromMillisSinceEpoch(1234));
+ _lookupMock.setLookupCallReturnValue(std::move(origDefaults));
+ }
+ auto origCachedDefaults = rwcDefaults.getDefault(opCtx.get());
+ ASSERT_EQ(Timestamp(10, 20), *origCachedDefaults.getUpdateOpTime());
+ ASSERT_EQ(Date_t::fromMillisSinceEpoch(1234), *origCachedDefaults.getUpdateWallClockTime());
+
+ // Change the mock's defaults, but don't invalidate the cache yet. The cache should still return
+ // the original defaults.
+ {
+ RWConcernDefault newDefaults;
+ newDefaults.setUpdateOpTime(Timestamp(50, 20));
+ newDefaults.setUpdateWallClockTime(Date_t::fromMillisSinceEpoch(5678));
+ _lookupMock.setLookupCallReturnValue(std::move(newDefaults));
+
+ auto cachedDefaults = rwcDefaults.getDefault(opCtx.get());
+ ASSERT_EQ(Timestamp(10, 20), *cachedDefaults.getUpdateOpTime());
+ ASSERT_EQ(Date_t::fromMillisSinceEpoch(1234), *cachedDefaults.getUpdateWallClockTime());
+ }
+
+ // Rollback to a timestamp should invalidate the cache and getting the defaults should now
+ // return the latest value.
+ {
+ OpObserverImpl opObserver;
+ OpObserver::RollbackObserverInfo rbInfo;
+ opObserver.onReplicationRollback(opCtx.get(), rbInfo);
+ }
+ auto newCachedDefaults = rwcDefaults.getDefault(opCtx.get());
+ ASSERT_EQ(Timestamp(50, 20), *newCachedDefaults.getUpdateOpTime());
+ ASSERT_EQ(Date_t::fromMillisSinceEpoch(5678), *newCachedDefaults.getUpdateWallClockTime());
+}
+
+TEST_F(OpObserverTest, OnInsertChecksIfTenantMigrationIsBlockingWrites) {
+ auto opCtx = cc().makeOperationContext();
+ const std::string kTenantId = "tenantId";
+ const NamespaceString nss(boost::none, "tenantId_db", "testColl");
+ const auto uuid = UUID::gen();
+
+ // Add a tenant migration access blocker on donor for blocking writes.
+ auto donorMtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
+ getServiceContext(),
+ uuid,
+ kTenantId,
+ MigrationProtocolEnum::kMultitenantMigrations,
+ "fakeConnString");
+ TenantMigrationAccessBlockerRegistry::get(getServiceContext()).add(kTenantId, donorMtab);
+ donorMtab->startBlockingWrites();
+
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(BSON("_id" << 0 << "data"
+ << "x"));
+
+ {
+ AutoGetCollection autoColl(opCtx.get(), nss, MODE_IX);
+ OpObserverImpl opObserver;
+ ASSERT_THROWS_CODE(
+ opObserver.onInserts(opCtx.get(), nss, uuid, insert.begin(), insert.end(), false),
+ DBException,
+ ErrorCodes::TenantMigrationConflict);
+ }
+ TenantMigrationAccessBlockerRegistry::get(getServiceContext()).shutDown();
+}
+
+TEST_F(OpObserverTransactionTest,
+ OnUnpreparedTransactionCommitChecksIfTenantMigrationIsBlockingWrites) {
+ const std::string kTenantId = "tenantId";
+ const auto uuid = UUID::gen();
+
+ // Add a tenant migration access blocker on donor for blocking writes.
+ auto donorMtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
+ getServiceContext(),
+ uuid,
+ kTenantId,
+ MigrationProtocolEnum::kMultitenantMigrations,
+ "fakeConnString");
+ TenantMigrationAccessBlockerRegistry::get(getServiceContext()).add(kTenantId, donorMtab);
+
+ const NamespaceString nss(boost::none, "tenantId_db", "testColl");
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+
+ std::vector<InsertStatement> insert;
+ insert.emplace_back(0,
+ BSON("_id" << 0 << "data"
+ << "x"));
+
+ {
+ AutoGetCollection autoColl(opCtx(), nss, MODE_IX);
+ opObserver().onInserts(opCtx(), nss, uuid, insert.begin(), insert.end(), false);
+ }
+
+ donorMtab->startBlockingWrites();
+
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
+ ASSERT_THROWS_CODE(opObserver().onUnpreparedTransactionCommit(opCtx(), &txnOps, 0),
+ DBException,
+ ErrorCodes::TenantMigrationConflict);
+
+ TenantMigrationAccessBlockerRegistry::get(getServiceContext()).shutDown();
+}
+
+} // namespace
+} // namespace mongo