summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.cpp2315
1 files changed, 2315 insertions, 0 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp
new file mode 100644
index 00000000000..e89853e92b6
--- /dev/null
+++ b/src/mongo/db/op_observer/op_observer_impl.cpp
@@ -0,0 +1,2315 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/op_observer/op_observer_impl.h"
+
+#include <algorithm>
+#include <limits>
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/batched_write_context.h"
+#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/catalog/import_collection_oplog_entry_gen.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/commands/txn_cmds_gen.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/exec/write_stage_common.h"
+#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/internal_transactions_feature_flag_gen.h"
+#include "mongo/db/keys_collection_document_gen.h"
+#include "mongo/db/logical_time_validator.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/op_observer/op_observer_util.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/change_stream_pre_image_helpers.h"
+#include "mongo/db/pipeline/change_stream_preimage_gen.h"
+#include "mongo/db/read_write_concern_defaults.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
+#include "mongo/db/repl/tenant_migration_decoration.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/sharding_write_router.h"
+#include "mongo/db/server_feature_flags_gen.h"
+#include "mongo/db/server_options.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/timeseries/bucket_catalog.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/db/transaction_participant_gen.h"
+#include "mongo/db/views/durable_view_catalog.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/scripting/engine.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/fail_point.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
+
+
+namespace mongo {
+using repl::DurableOplogEntry;
+using repl::MutableOplogEntry;
+using ChangeStreamPreImageRecordingMode = repl::ReplOperation::ChangeStreamPreImageRecordingMode;
+
+const OperationContext::Decoration<boost::optional<ShardId>> destinedRecipientDecoration =
+ OperationContext::declareDecoration<boost::optional<ShardId>>();
+
+namespace {
+
+MONGO_FAIL_POINT_DEFINE(failCollectionUpdates);
+MONGO_FAIL_POINT_DEFINE(hangAndFailUnpreparedCommitAfterReservingOplogSlot);
+MONGO_FAIL_POINT_DEFINE(hangAfterLoggingApplyOpsForTransaction);
+
+constexpr auto kNumRecordsFieldName = "numRecords"_sd;
+constexpr auto kMsgFieldName = "msg"_sd;
+constexpr long long kInvalidNumRecords = -1LL;
+
+Date_t getWallClockTimeForOpLog(OperationContext* opCtx) {
+ auto const clockSource = opCtx->getServiceContext()->getFastClockSource();
+ return clockSource->now();
+}
+
+repl::OpTime logOperation(OperationContext* opCtx,
+ MutableOplogEntry* oplogEntry,
+ bool assignWallClockTime = true) {
+ if (assignWallClockTime) {
+ oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx));
+ }
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ auto opTime = repl::logOp(opCtx, oplogEntry);
+ times.push_back(opTime);
+ return opTime;
+}
+
+/**
+ * Updates the session state with the last write timestamp and transaction for that session.
+ *
+ * In the case of writes with transaction/statement id, this method will be recursively entered a
+ * second time for the actual write to the transactions table. Since this write does not generate an
+ * oplog entry, the recursion will stop at this point.
+ */
+void onWriteOpCompleted(OperationContext* opCtx,
+ std::vector<StmtId> stmtIdsWritten,
+ SessionTxnRecord sessionTxnRecord) {
+ if (sessionTxnRecord.getLastWriteOpTime().isNull())
+ return;
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ if (!txnParticipant ||
+ (!stmtIdsWritten.empty() && stmtIdsWritten.front() == kUninitializedStmtId))
+ // If the first statement written in uninitialized, then all the statements are assumed to
+ // be uninitialized.
+ return;
+
+ // We add these here since they may not exist if we return early.
+ const auto lsid = *opCtx->getLogicalSessionId();
+ sessionTxnRecord.setSessionId(lsid);
+ if (isInternalSessionForRetryableWrite(lsid)) {
+ sessionTxnRecord.setParentSessionId(*getParentSessionId(lsid));
+ }
+ sessionTxnRecord.setTxnNum(*opCtx->getTxnNumber());
+ txnParticipant.onWriteOpCompletedOnPrimary(opCtx, std::move(stmtIdsWritten), sessionTxnRecord);
+}
+
+/**
+ * Given the collection count from Collection::numRecords(), create and return the object for the
+ * 'o2' field of a drop or rename oplog entry. If the collection count exceeds the upper limit of a
+ * BSON NumberLong (long long), we will add a count of -1 and append a message with the original
+ * collection count.
+ *
+ * Replication rollback uses this field to correct correction counts on drop-pending collections.
+ */
+BSONObj makeObject2ForDropOrRename(uint64_t numRecords) {
+ BSONObjBuilder obj2Builder;
+ if (numRecords > static_cast<uint64_t>(std::numeric_limits<long long>::max())) {
+ obj2Builder.appendNumber(kNumRecordsFieldName, kInvalidNumRecords);
+ std::string msg = str::stream() << "Collection count " << numRecords
+ << " is larger than the "
+ "maximum int64_t value. Setting numRecords to -1.";
+ obj2Builder.append(kMsgFieldName, msg);
+ } else {
+ obj2Builder.appendNumber(kNumRecordsFieldName, static_cast<long long>(numRecords));
+ }
+ auto obj = obj2Builder.obj();
+ return obj;
+}
+
+struct OpTimeBundle {
+ repl::OpTime writeOpTime;
+ repl::OpTime prePostImageOpTime;
+ Date_t wallClockTime;
+};
+
+struct ImageBundle {
+ repl::RetryImageEnum imageKind;
+ BSONObj imageDoc;
+ Timestamp timestamp;
+};
+
+/**
+ * Write oplog entry(ies) for the update operation.
+ */
+OpTimeBundle replLogUpdate(OperationContext* opCtx,
+ const OplogUpdateEntryArgs& args,
+ MutableOplogEntry* oplogEntry) {
+ // TODO SERVER-62114 Change to check for upgraded FCV rather than feature flag
+ if (gFeatureFlagRequireTenantID.isEnabled(serverGlobalParams.featureCompatibility))
+ oplogEntry->setTid(args.nss.tenantId());
+ oplogEntry->setNss(args.nss);
+ oplogEntry->setUuid(args.uuid);
+
+ repl::OplogLink oplogLink;
+ repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds);
+
+ OpTimeBundle opTimes;
+ // We never want to store pre- or post- images when we're migrating oplog entries from another
+ // replica set.
+ const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx);
+ const auto storePreImageInOplogForRetryableWrite =
+ (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage &&
+ opCtx->getTxnNumber() && !oplogEntry->getNeedsRetryImage());
+ if ((storePreImageInOplogForRetryableWrite ||
+ args.updateArgs->preImageRecordingEnabledForCollection) &&
+ !migrationRecipientInfo) {
+ MutableOplogEntry noopEntry = *oplogEntry;
+ invariant(args.updateArgs->preImageDoc);
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setObject(*args.updateArgs->preImageDoc);
+ if (args.updateArgs->preImageRecordingEnabledForCollection &&
+ args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection) {
+ // We are writing a no-op pre-image oplog entry and storing a post-image into a side
+ // collection. In this case, we expect to have already reserved 3 oplog slots:
+ // TS - 2: Oplog slot for the current no-op preimage oplog entry
+ // TS - 1: Oplog slot for the forged no-op oplog entry that may eventually get used by
+ // tenant migrations or resharding.
+ // TS: Oplog slot for the actual update oplog entry.
+ const auto reservedOplogSlots = args.updateArgs->oplogSlots;
+ invariant(reservedOplogSlots.size() == 3);
+ noopEntry.setOpTime(repl::OpTime(reservedOplogSlots.front().getTimestamp(),
+ reservedOplogSlots.front().getTerm()));
+ }
+ oplogLink.preImageOpTime = logOperation(opCtx, &noopEntry);
+ if (storePreImageInOplogForRetryableWrite) {
+ opTimes.prePostImageOpTime = oplogLink.preImageOpTime;
+ }
+ }
+
+ // This case handles storing the post image for retryable findAndModify's.
+ if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PostImage &&
+ opCtx->getTxnNumber() && !migrationRecipientInfo && !oplogEntry->getNeedsRetryImage()) {
+ MutableOplogEntry noopEntry = *oplogEntry;
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setObject(args.updateArgs->updatedDoc);
+ oplogLink.postImageOpTime = logOperation(opCtx, &noopEntry);
+ invariant(opTimes.prePostImageOpTime.isNull());
+ opTimes.prePostImageOpTime = oplogLink.postImageOpTime;
+ }
+
+ oplogEntry->setOpType(repl::OpTypeEnum::kUpdate);
+ oplogEntry->setObject(args.updateArgs->update);
+ oplogEntry->setObject2(args.updateArgs->criteria);
+ oplogEntry->setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate);
+ // oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write.
+ repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds);
+ if (!args.updateArgs->oplogSlots.empty()) {
+ oplogEntry->setOpTime(args.updateArgs->oplogSlots.back());
+ }
+ opTimes.writeOpTime = logOperation(opCtx, oplogEntry);
+ opTimes.wallClockTime = oplogEntry->getWallClockTime();
+ return opTimes;
+}
+
+/**
+ * Write oplog entry(ies) for the delete operation.
+ */
+OpTimeBundle replLogDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ MutableOplogEntry* oplogEntry,
+ const boost::optional<UUID>& uuid,
+ StmtId stmtId,
+ bool fromMigrate,
+ const boost::optional<BSONObj>& deletedDoc) {
+ // TODO SERVER-62114 Change to check for upgraded FCV rather than feature flag
+ if (gFeatureFlagRequireTenantID.isEnabled(serverGlobalParams.featureCompatibility))
+ oplogEntry->setTid(nss.tenantId());
+ oplogEntry->setNss(nss);
+ oplogEntry->setUuid(uuid);
+ oplogEntry->setDestinedRecipient(destinedRecipientDecoration(opCtx));
+
+ repl::OplogLink oplogLink;
+ repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId});
+
+ OpTimeBundle opTimes;
+ // We never want to store pre-images when we're migrating oplog entries from another
+ // replica set.
+ const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx);
+ if (deletedDoc && !migrationRecipientInfo) {
+ MutableOplogEntry noopEntry = *oplogEntry;
+ noopEntry.setOpType(repl::OpTypeEnum::kNoop);
+ noopEntry.setObject(*deletedDoc);
+ auto noteOplog = logOperation(opCtx, &noopEntry);
+ opTimes.prePostImageOpTime = noteOplog;
+ oplogLink.preImageOpTime = noteOplog;
+ }
+
+ oplogEntry->setOpType(repl::OpTypeEnum::kDelete);
+ oplogEntry->setObject(repl::documentKeyDecoration(opCtx).get().getShardKeyAndId());
+ oplogEntry->setFromMigrateIfTrue(fromMigrate);
+ // oplogLink could have been changed to include preImageOpTime by the previous no-op write.
+ repl::appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, {stmtId});
+ opTimes.writeOpTime = logOperation(opCtx, oplogEntry);
+ opTimes.wallClockTime = oplogEntry->getWallClockTime();
+ return opTimes;
+}
+
+void writeToImageCollection(OperationContext* opCtx,
+ const LogicalSessionId& sessionId,
+ const Timestamp timestamp,
+ repl::RetryImageEnum imageKind,
+ const BSONObj& dataImage) {
+ repl::ImageEntry imageEntry;
+ imageEntry.set_id(sessionId);
+ imageEntry.setTxnNumber(opCtx->getTxnNumber().get());
+ imageEntry.setTs(timestamp);
+ imageEntry.setImageKind(imageKind);
+ imageEntry.setImage(dataImage);
+
+ DisableDocumentValidation documentValidationDisabler(
+ opCtx, DocumentValidationSettings::kDisableInternalValidation);
+
+ // In practice, this lock acquisition on kConfigImagesNamespace cannot block. The only time a
+ // stronger lock acquisition is taken on this namespace is during step up to create the
+ // collection.
+ AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ AutoGetCollection imageCollectionRaii(
+ opCtx, NamespaceString::kConfigImagesNamespace, LockMode::MODE_IX);
+ auto curOp = CurOp::get(opCtx);
+ const std::string existingNs = curOp->getNS();
+ UpdateResult res =
+ Helpers::upsert(opCtx, NamespaceString::kConfigImagesNamespace, imageEntry.toBSON());
+ {
+ stdx::lock_guard<Client> clientLock(*opCtx->getClient());
+ curOp->setNS_inlock(existingNs);
+ }
+
+ invariant(res.numDocsModified == 1 || !res.upsertedId.isEmpty());
+}
+
+bool shouldTimestampIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) {
+ // This function returns whether a timestamp for a catalog write when beginning an index build,
+ // or aborting an index build is necessary. There are four scenarios:
+
+ // 1. A timestamp is already set -- replication application sets a timestamp ahead of time.
+ // This could include the phase of initial sync where it applies oplog entries. Also,
+ // primaries performing an index build via `applyOps` may have a wrapping commit timestamp.
+ if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull())
+ return false;
+
+ // 2. If the node is initial syncing, we do not set a timestamp.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord->isReplEnabled() && replCoord->getMemberState().startup2())
+ return false;
+
+ // 3. If the index build is on the local database, do not timestamp.
+ if (nss.isLocal())
+ return false;
+
+ // 4. All other cases, we generate a timestamp by writing a no-op oplog entry. This is
+ // better than using a ghost timestamp. Writing an oplog entry ensures this node is
+ // primary.
+ return true;
+}
+
+} // namespace
+
+void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ BSONObj indexDoc,
+ bool fromMigrate) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ const bool inMultiDocumentTransaction =
+ txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+
+ if (inMultiDocumentTransaction) {
+ auto operation = MutableOplogEntry::makeCreateIndexesCommand(nss, uuid, indexDoc);
+ txnParticipant.addTransactionOperation(opCtx, operation);
+ } else {
+ BSONObjBuilder builder;
+ builder.append("createIndexes", nss.coll());
+ builder.appendElements(indexDoc);
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(builder.done());
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
+ }
+}
+
+void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& collUUID,
+ const UUID& indexBuildUUID,
+ const std::vector<BSONObj>& indexes,
+ bool fromMigrate) {
+ BSONObjBuilder oplogEntryBuilder;
+ oplogEntryBuilder.append("startIndexBuild", nss.coll());
+
+ indexBuildUUID.appendToBuilder(&oplogEntryBuilder, "indexBuildUUID");
+
+ BSONArrayBuilder indexesArr(oplogEntryBuilder.subarrayStart("indexes"));
+ for (auto indexDoc : indexes) {
+ indexesArr.append(indexDoc);
+ }
+ indexesArr.done();
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(collUUID);
+ oplogEntry.setObject(oplogEntryBuilder.done());
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
+}
+
+void OpObserverImpl::onStartIndexBuildSinglePhase(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ if (!shouldTimestampIndexBuildSinglePhase(opCtx, nss)) {
+ return;
+ }
+
+
+ onInternalOpMessage(
+ opCtx,
+ {},
+ boost::none,
+ BSON("msg" << std::string(str::stream() << "Creating indexes. Coll: " << nss)),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+}
+
+void OpObserverImpl::onAbortIndexBuildSinglePhase(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ if (!shouldTimestampIndexBuildSinglePhase(opCtx, nss)) {
+ return;
+ }
+
+ onInternalOpMessage(
+ opCtx,
+ {},
+ boost::none,
+ BSON("msg" << std::string(str::stream() << "Aborting indexes. Coll: " << nss)),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+}
+
+void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& collUUID,
+ const UUID& indexBuildUUID,
+ const std::vector<BSONObj>& indexes,
+ bool fromMigrate) {
+ BSONObjBuilder oplogEntryBuilder;
+ oplogEntryBuilder.append("commitIndexBuild", nss.coll());
+
+ indexBuildUUID.appendToBuilder(&oplogEntryBuilder, "indexBuildUUID");
+
+ BSONArrayBuilder indexesArr(oplogEntryBuilder.subarrayStart("indexes"));
+ for (auto indexDoc : indexes) {
+ indexesArr.append(indexDoc);
+ }
+ indexesArr.done();
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(collUUID);
+ oplogEntry.setObject(oplogEntryBuilder.done());
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
+}
+
+void OpObserverImpl::onAbortIndexBuild(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& collUUID,
+ const UUID& indexBuildUUID,
+ const std::vector<BSONObj>& indexes,
+ const Status& cause,
+ bool fromMigrate) {
+ BSONObjBuilder oplogEntryBuilder;
+ oplogEntryBuilder.append("abortIndexBuild", nss.coll());
+
+ indexBuildUUID.appendToBuilder(&oplogEntryBuilder, "indexBuildUUID");
+
+ BSONArrayBuilder indexesArr(oplogEntryBuilder.subarrayStart("indexes"));
+ for (auto indexDoc : indexes) {
+ indexesArr.append(indexDoc);
+ }
+ indexesArr.done();
+
+ BSONObjBuilder causeBuilder(oplogEntryBuilder.subobjStart("cause"));
+ // Some functions that extract a Status from a BSONObj, such as getStatusFromCommandResult(),
+ // expect the 'ok' field.
+ causeBuilder.appendBool("ok", 0);
+ cause.serializeErrorToBSON(&causeBuilder);
+ causeBuilder.done();
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(collUUID);
+ oplogEntry.setObject(oplogEntryBuilder.done());
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
+}
+
+void OpObserverImpl::onInserts(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
+ bool fromMigrate) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ const bool inMultiDocumentTransaction =
+ txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+
+ Date_t lastWriteDate;
+
+ std::vector<repl::OpTime> opTimeList;
+ repl::OpTime lastOpTime;
+
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
+
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ const bool inBatchedWrite = batchedWriteContext.writesAreBatched();
+
+ if (inBatchedWrite) {
+ invariant(!fromMigrate);
+
+ write_stage_common::PreWriteFilter preWriteFilter(opCtx, nss);
+
+ for (auto iter = first; iter != last; iter++) {
+ const auto docKey = repl::getDocumentKey(opCtx, nss, iter->doc).getShardKeyAndId();
+ auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid, iter->doc, docKey);
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(iter->doc));
+
+ if (!OperationShardingState::isComingFromRouter(opCtx) &&
+ preWriteFilter.computeAction(Document(iter->doc)) ==
+ write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) {
+ LOGV2_DEBUG(6585800,
+ 3,
+ "Marking insert operation of orphan document with the 'fromMigrate' "
+ "flag to prevent a wrong change stream event",
+ "namespace"_attr = nss,
+ "document"_attr = iter->doc);
+
+ operation.setFromMigrate(true);
+ }
+
+ batchedWriteContext.addBatchedOperation(opCtx, operation);
+ }
+ } else if (inMultiDocumentTransaction) {
+ invariant(!fromMigrate);
+
+ // Do not add writes to the profile collection to the list of transaction operations, since
+ // these are done outside the transaction. There is no top-level WriteUnitOfWork when we are
+ // in a SideTransactionBlock.
+ if (!opCtx->getWriteUnitOfWork()) {
+ invariant(nss.isSystemDotProfile());
+ return;
+ }
+
+ const bool inRetryableInternalTransaction =
+ isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+ write_stage_common::PreWriteFilter preWriteFilter(opCtx, nss);
+
+ for (auto iter = first; iter != last; iter++) {
+ const auto docKey = repl::getDocumentKey(opCtx, nss, iter->doc).getShardKeyAndId();
+ auto operation = MutableOplogEntry::makeInsertOperation(nss, uuid, iter->doc, docKey);
+ if (inRetryableInternalTransaction) {
+ operation.setInitializedStatementIds(iter->stmtIds);
+ }
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(iter->doc));
+
+ if (!OperationShardingState::isComingFromRouter(opCtx) &&
+ preWriteFilter.computeAction(Document(iter->doc)) ==
+ write_stage_common::PreWriteFilter::Action::kWriteAsFromMigrate) {
+ LOGV2_DEBUG(6585801,
+ 3,
+ "Marking insert operation of orphan document with the 'fromMigrate' "
+ "flag to prevent a wrong change stream event",
+ "namespace"_attr = nss,
+ "document"_attr = iter->doc);
+
+ operation.setFromMigrate(true);
+ }
+
+ txnParticipant.addTransactionOperation(opCtx, operation);
+ }
+ } else {
+ std::function<boost::optional<ShardId>(const BSONObj& doc)> getDestinedRecipientFn =
+ [&shardingWriteRouter](const BSONObj& doc) {
+ return shardingWriteRouter.getReshardingDestinedRecipient(doc);
+ };
+
+ MutableOplogEntry oplogEntryTemplate;
+ // TODO SERVER-62114 Change to check for upgraded FCV rather than feature flag
+ if (gFeatureFlagRequireTenantID.isEnabled(serverGlobalParams.featureCompatibility))
+ oplogEntryTemplate.setTid(nss.tenantId());
+ oplogEntryTemplate.setNss(nss);
+ oplogEntryTemplate.setUuid(uuid);
+ oplogEntryTemplate.setFromMigrateIfTrue(fromMigrate);
+ lastWriteDate = getWallClockTimeForOpLog(opCtx);
+ oplogEntryTemplate.setWallClockTime(lastWriteDate);
+
+ opTimeList =
+ repl::logInsertOps(opCtx, &oplogEntryTemplate, first, last, getDestinedRecipientFn);
+ if (!opTimeList.empty())
+ lastOpTime = opTimeList.back();
+
+ auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
+ using std::begin;
+ using std::end;
+ times.insert(end(times), begin(opTimeList), end(opTimeList));
+
+ std::vector<StmtId> stmtIdsWritten;
+ std::for_each(first, last, [&](const InsertStatement& stmt) {
+ stmtIdsWritten.insert(stmtIdsWritten.end(), stmt.stmtIds.begin(), stmt.stmtIds.end());
+ });
+
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setLastWriteOpTime(lastOpTime);
+ sessionTxnRecord.setLastWriteDate(lastWriteDate);
+ onWriteOpCompleted(opCtx, stmtIdsWritten, sessionTxnRecord);
+ }
+
+ size_t index = 0;
+ for (auto it = first; it != last; it++, index++) {
+ auto opTime = opTimeList.empty() ? repl::OpTime() : opTimeList[index];
+ shardObserveInsertOp(opCtx,
+ nss,
+ it->doc,
+ opTime,
+ shardingWriteRouter,
+ fromMigrate,
+ inMultiDocumentTransaction);
+ }
+
+ if (nss.coll() == "system.js") {
+ Scope::storedFuncMod(opCtx);
+ } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
+ try {
+ for (auto it = first; it != last; it++) {
+ uassertStatusOK(DurableViewCatalog::onExternalInsert(opCtx, it->doc, nss));
+ }
+ } catch (const DBException&) {
+ // If a previous operation left the view catalog in an invalid state, our inserts can
+ // fail even if all the definitions are valid. Reloading may help us reset the state.
+ DurableViewCatalog::onExternalChange(opCtx, nss);
+ }
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) {
+ for (auto it = first; it != last; it++) {
+ MongoDSessionCatalog::observeDirectWriteToConfigTransactions(opCtx, it->doc);
+ }
+ } else if (nss == NamespaceString::kConfigSettingsNamespace) {
+ for (auto it = first; it != last; it++) {
+ ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
+ opCtx, it->doc["_id"], it->doc);
+ }
+ } else if (nss == NamespaceString::kExternalKeysCollectionNamespace) {
+ for (auto it = first; it != last; it++) {
+ auto externalKey = ExternalKeysCollectionDocument::parse(
+ IDLParserErrorContext("externalKey"), it->doc);
+ opCtx->recoveryUnit()->onCommit(
+ [this, opCtx, externalKey = std::move(externalKey)](
+ boost::optional<Timestamp> unusedCommitTime) mutable {
+ auto validator = LogicalTimeValidator::get(opCtx);
+ if (validator) {
+ validator->cacheExternalKey(externalKey);
+ }
+ });
+ }
+ }
+}
+
+void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
+ failCollectionUpdates.executeIf(
+ [&](const BSONObj&) {
+ uasserted(40654,
+ str::stream() << "failCollectionUpdates failpoint enabled, namespace: "
+ << args.nss.ns() << ", update: " << args.updateArgs->update
+ << " on document with " << args.updateArgs->criteria);
+ },
+ [&](const BSONObj& data) {
+ // If the failpoint specifies no collection or matches the existing one, fail.
+ auto collElem = data["collectionNS"];
+ return !collElem || args.nss.ns() == collElem.String();
+ });
+
+ // Do not log a no-op operation; see SERVER-21738
+ if (args.updateArgs->update.isEmpty()) {
+ return;
+ }
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ const bool inMultiDocumentTransaction =
+ txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+
+ ShardingWriteRouter shardingWriteRouter(opCtx, args.nss, Grid::get(opCtx)->catalogCache());
+
+ OpTimeBundle opTime;
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ const bool inBatchedWrite = batchedWriteContext.writesAreBatched();
+
+ if (inBatchedWrite) {
+ auto operation = MutableOplogEntry::makeUpdateOperation(
+ args.nss, args.uuid, args.updateArgs->update, args.updateArgs->criteria);
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
+ operation.setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate);
+ batchedWriteContext.addBatchedOperation(opCtx, operation);
+ } else if (inMultiDocumentTransaction) {
+ const bool inRetryableInternalTransaction =
+ isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+
+ auto operation = MutableOplogEntry::makeUpdateOperation(
+ args.nss, args.uuid, args.updateArgs->update, args.updateArgs->criteria);
+
+ if (inRetryableInternalTransaction) {
+ uassert(6462400,
+ str::stream() << "Found a retryable internal transaction on a sharded cluster "
+ << "executing an update against the collection '" << args.nss
+ << "' with the 'recordPreImages' option enabled",
+ !args.updateArgs->preImageRecordingEnabledForCollection ||
+ serverGlobalParams.clusterRole == ClusterRole::None);
+
+ operation.setInitializedStatementIds(args.updateArgs->stmtIds);
+ if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
+ invariant(args.updateArgs->preImageDoc);
+ operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setPreImageRecordedForRetryableInternalTransaction();
+ if (args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection &&
+ !args.updateArgs->preImageRecordingEnabledForCollection) {
+ operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
+ }
+ }
+ if (args.updateArgs->storeDocOption ==
+ CollectionUpdateArgs::StoreDocOption::PostImage) {
+ invariant(!args.updateArgs->updatedDoc.isEmpty());
+ operation.setPostImage(args.updateArgs->updatedDoc.getOwned());
+ if (args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection) {
+ operation.setNeedsRetryImage(repl::RetryImageEnum::kPostImage);
+ }
+ }
+ }
+
+ if (args.updateArgs->preImageRecordingEnabledForCollection) {
+ invariant(args.updateArgs->preImageDoc);
+ tassert(
+ 5869402,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection);
+ operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kOplog);
+ }
+
+ if (args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection) {
+ invariant(args.updateArgs->preImageDoc);
+ tassert(
+ 5869403,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.updateArgs->preImageRecordingEnabledForCollection);
+ operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kPreImagesCollection);
+ }
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
+ operation.setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate);
+ txnParticipant.addTransactionOperation(opCtx, operation);
+ } else {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
+
+ if (args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection) {
+ // If we've stored a preImage:
+ if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage &&
+ // And we're not writing to a noop entry anyways for
+ // `preImageRecordingEnabledForCollection`:
+ !args.updateArgs->preImageRecordingEnabledForCollection) {
+ oplogEntry.setNeedsRetryImage({repl::RetryImageEnum::kPreImage});
+ } else if (args.updateArgs->storeDocOption ==
+ CollectionUpdateArgs::StoreDocOption::PostImage) {
+ // Or if we're storing a postImage.
+ oplogEntry.setNeedsRetryImage({repl::RetryImageEnum::kPostImage});
+ }
+ }
+
+ opTime = replLogUpdate(opCtx, args, &oplogEntry);
+
+ if (oplogEntry.getNeedsRetryImage()) {
+ // If the oplog entry has `needsRetryImage`, copy the image into image collection.
+ const BSONObj& dataImage = [&]() {
+ if (oplogEntry.getNeedsRetryImage().get() == repl::RetryImageEnum::kPreImage) {
+ return args.updateArgs->preImageDoc.get();
+ } else {
+ return args.updateArgs->updatedDoc;
+ }
+ }();
+ writeToImageCollection(opCtx,
+ *opCtx->getLogicalSessionId(),
+ opTime.writeOpTime.getTimestamp(),
+ oplogEntry.getNeedsRetryImage().get(),
+ dataImage);
+ }
+
+ // Write a pre-image to the change streams pre-images collection when following conditions
+ // are met:
+ // 1. The collection has 'changeStreamPreAndPostImages' enabled.
+ // 2. The node wrote the oplog entry for the corresponding operation.
+ // 3. The request to write the pre-image does not come from chunk-migrate event, i.e. source
+ // of the request is not 'fromMigrate'. The 'fromMigrate' events are filtered out by
+ // change streams and storing them in pre-image collection is redundant.
+ // 4. a request to update is not on a temporary resharding collection. This update request
+ // does not result in change streams events. Recording pre-images from temporary
+ // resharing collection could result in incorrect pre-image getting recorded due to the
+ // temporary resharding collection not being consistent until writes are blocked (initial
+ // sync mode application).
+ if (args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection &&
+ !opTime.writeOpTime.isNull() &&
+ args.updateArgs->source != OperationSource::kFromMigrate &&
+ !args.nss.isTemporaryReshardingCollection()) {
+ const auto& preImageDoc = args.updateArgs->preImageDoc;
+ tassert(5868600, "PreImage must be set", preImageDoc && !preImageDoc.get().isEmpty());
+
+ ChangeStreamPreImageId id(args.uuid, opTime.writeOpTime.getTimestamp(), 0);
+ ChangeStreamPreImage preImage(id, opTime.wallClockTime, preImageDoc.get());
+ writeToChangeStreamPreImagesCollection(opCtx, preImage);
+ }
+
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
+ sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
+ onWriteOpCompleted(opCtx, args.updateArgs->stmtIds, sessionTxnRecord);
+ }
+
+ if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) {
+ if (args.updateArgs->source != OperationSource::kFromMigrate) {
+ shardObserveUpdateOp(opCtx,
+ args.nss,
+ args.updateArgs->preImageDoc,
+ args.updateArgs->updatedDoc,
+ opTime.writeOpTime,
+ shardingWriteRouter,
+ opTime.prePostImageOpTime,
+ inMultiDocumentTransaction);
+ }
+ }
+
+ if (args.nss.coll() == "system.js") {
+ Scope::storedFuncMod(opCtx);
+ } else if (args.nss.coll() == DurableViewCatalog::viewsCollectionName()) {
+ DurableViewCatalog::onExternalChange(opCtx, args.nss);
+ } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
+ !opTime.writeOpTime.isNull()) {
+ MongoDSessionCatalog::observeDirectWriteToConfigTransactions(opCtx,
+ args.updateArgs->updatedDoc);
+ } else if (args.nss == NamespaceString::kConfigSettingsNamespace) {
+ ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
+ opCtx, args.updateArgs->updatedDoc["_id"], args.updateArgs->updatedDoc);
+ } else if (args.nss.isTimeseriesBucketsCollection()) {
+ if (args.updateArgs->source != OperationSource::kTimeseriesInsert) {
+ auto& bucketCatalog = BucketCatalog::get(opCtx);
+ bucketCatalog.clear(args.updateArgs->updatedDoc["_id"].OID());
+ }
+ }
+}
+
+void OpObserverImpl::aboutToDelete(OperationContext* opCtx,
+ NamespaceString const& nss,
+ const UUID& uuid,
+ BSONObj const& doc) {
+ repl::documentKeyDecoration(opCtx).emplace(repl::getDocumentKey(opCtx, nss, doc));
+
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
+
+ repl::DurableReplOperation op;
+ op.setDestinedRecipient(shardingWriteRouter.getReshardingDestinedRecipient(doc));
+ destinedRecipientDecoration(opCtx) = op.getDestinedRecipient();
+
+ shardObserveAboutToDelete(opCtx, nss, doc);
+
+ if (nss.isTimeseriesBucketsCollection()) {
+ auto& bucketCatalog = BucketCatalog::get(opCtx);
+ bucketCatalog.clear(doc["_id"].OID());
+ }
+}
+
+void OpObserverImpl::onDelete(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ StmtId stmtId,
+ const OplogDeleteEntryArgs& args) {
+ auto optDocKey = repl::documentKeyDecoration(opCtx);
+ invariant(optDocKey, nss.ns());
+ auto& documentKey = optDocKey.get();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ const bool inMultiDocumentTransaction =
+ txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ const bool inBatchedWrite = batchedWriteContext.writesAreBatched();
+
+ OpTimeBundle opTime;
+ if (inBatchedWrite) {
+ if (nss == NamespaceString::kSessionTransactionsTableNamespace) {
+ MongoDSessionCatalog::observeDirectWriteToConfigTransactions(opCtx,
+ documentKey.getId());
+ }
+ auto operation =
+ MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId());
+ operation.setDestinedRecipient(destinedRecipientDecoration(opCtx));
+ operation.setFromMigrateIfTrue(args.fromMigrate);
+ batchedWriteContext.addBatchedOperation(opCtx, operation);
+ } else if (inMultiDocumentTransaction) {
+ const bool inRetryableInternalTransaction =
+ isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+
+ tassert(5868700,
+ "Attempted a retryable write within a non-retryable multi-document transaction",
+ inRetryableInternalTransaction ||
+ args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kNone);
+
+ auto operation =
+ MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId());
+
+ if (inRetryableInternalTransaction) {
+ uassert(6462401,
+ str::stream() << "Found a retryable internal transaction on a sharded cluster "
+ << "executing an delete against the collection '" << nss
+ << "' with the 'recordPreImages' option enabled",
+ !args.preImageRecordingEnabledForCollection ||
+ serverGlobalParams.clusterRole == ClusterRole::None);
+
+ operation.setInitializedStatementIds({stmtId});
+ if (args.retryableFindAndModifyLocation != RetryableFindAndModifyLocation::kNone) {
+ tassert(6054000,
+ "Deleted document must be present for pre-image recording",
+ args.deletedDoc);
+ operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setPreImageRecordedForRetryableInternalTransaction();
+ if (args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection &&
+ !args.preImageRecordingEnabledForCollection) {
+ operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
+ }
+ }
+ }
+
+ if (args.changeStreamPreAndPostImagesEnabledForCollection) {
+ tassert(5869400,
+ "Deleted document must be present for pre-image recording",
+ args.deletedDoc);
+ tassert(
+ 5869401,
+ "Change stream pre-image recording to the oplog and to the pre-image collection "
+ "requested at the same time",
+ !args.preImageRecordingEnabledForCollection);
+ operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kPreImagesCollection);
+ } else if (args.preImageRecordingEnabledForCollection) {
+ tassert(5868701,
+ "Deleted document must be present for pre-image recording",
+ args.deletedDoc);
+ operation.setPreImage(args.deletedDoc->getOwned());
+ operation.setChangeStreamPreImageRecordingMode(
+ ChangeStreamPreImageRecordingMode::kOplog);
+ }
+
+ operation.setDestinedRecipient(destinedRecipientDecoration(opCtx));
+ operation.setFromMigrateIfTrue(args.fromMigrate);
+ txnParticipant.addTransactionOperation(opCtx, operation);
+ } else {
+ MutableOplogEntry oplogEntry;
+ boost::optional<BSONObj> deletedDocForOplog = boost::none;
+
+ if (args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kOplog ||
+ args.preImageRecordingEnabledForCollection) {
+ tassert(5868702,
+ "Deleted document must be present for pre-image recording",
+ args.deletedDoc);
+ deletedDocForOplog = {*(args.deletedDoc)};
+ } else if (args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection) {
+ tassert(5868703,
+ "Deleted document must be present for pre-image recording",
+ args.deletedDoc);
+ invariant(opCtx->getTxnNumber());
+
+ oplogEntry.setNeedsRetryImage({repl::RetryImageEnum::kPreImage});
+ if (!args.oplogSlots.empty()) {
+ oplogEntry.setOpTime(args.oplogSlots.back());
+ }
+ }
+ opTime = replLogDelete(
+ opCtx, nss, &oplogEntry, uuid, stmtId, args.fromMigrate, deletedDocForOplog);
+
+ if (oplogEntry.getNeedsRetryImage()) {
+ writeToImageCollection(opCtx,
+ *opCtx->getLogicalSessionId(),
+ opTime.writeOpTime.getTimestamp(),
+ repl::RetryImageEnum::kPreImage,
+ *(args.deletedDoc));
+ }
+
+ // Write a pre-image to the change streams pre-images collection when following conditions
+ // are met:
+ // 1. The collection has 'changeStreamPreAndPostImages' enabled.
+ // 2. The node wrote the oplog entry for the corresponding operation.
+ // 3. The request to write the pre-image does not come from chunk-migrate event, i.e. source
+ // of the request is not 'fromMigrate'. The 'fromMigrate' events are filtered out by
+ // change streams and storing them in pre-image collection is redundant.
+ // 4. a request to delete is not on a temporary resharding collection. This delete request
+ // does not result in change streams events. Recording pre-images from temporary
+ // resharing collection could result in incorrect pre-image getting recorded due to the
+ // temporary resharding collection not being consistent until writes are blocked (initial
+ // sync mode application).
+ if (args.changeStreamPreAndPostImagesEnabledForCollection && !opTime.writeOpTime.isNull() &&
+ !args.fromMigrate && !nss.isTemporaryReshardingCollection()) {
+ tassert(5868704, "Deleted document must be set", args.deletedDoc);
+
+ ChangeStreamPreImageId id(uuid, opTime.writeOpTime.getTimestamp(), 0);
+ ChangeStreamPreImage preImage(id, opTime.wallClockTime, *args.deletedDoc);
+ writeToChangeStreamPreImagesCollection(opCtx, preImage);
+ }
+
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
+ sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
+ onWriteOpCompleted(opCtx, std::vector<StmtId>{stmtId}, sessionTxnRecord);
+ }
+
+ if (nss != NamespaceString::kSessionTransactionsTableNamespace) {
+ if (!args.fromMigrate) {
+ ShardingWriteRouter shardingWriteRouter(opCtx, nss, Grid::get(opCtx)->catalogCache());
+ shardObserveDeleteOp(opCtx,
+ nss,
+ documentKey.getShardKeyAndId(),
+ opTime.writeOpTime,
+ shardingWriteRouter,
+ opTime.prePostImageOpTime,
+ inMultiDocumentTransaction);
+ }
+ }
+
+ if (nss.coll() == "system.js") {
+ Scope::storedFuncMod(opCtx);
+ } else if (nss.coll() == DurableViewCatalog::viewsCollectionName()) {
+ DurableViewCatalog::onExternalChange(opCtx, nss);
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace &&
+ !opTime.writeOpTime.isNull()) {
+ MongoDSessionCatalog::observeDirectWriteToConfigTransactions(opCtx, documentKey.getId());
+ } else if (nss == NamespaceString::kConfigSettingsNamespace) {
+ ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
+ opCtx, documentKey.getId().firstElement(), boost::none);
+ }
+}
+
+void OpObserverImpl::onInternalOpMessage(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<UUID>& uuid,
+ const BSONObj& msgObj,
+ const boost::optional<BSONObj> o2MsgObj,
+ const boost::optional<repl::OpTime> preImageOpTime,
+ const boost::optional<repl::OpTime> postImageOpTime,
+ const boost::optional<repl::OpTime> prevWriteOpTimeInTransaction,
+ const boost::optional<OplogSlot> slot) {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(msgObj);
+ oplogEntry.setObject2(o2MsgObj);
+ oplogEntry.setPreImageOpTime(preImageOpTime);
+ oplogEntry.setPostImageOpTime(postImageOpTime);
+ oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTimeInTransaction);
+ if (slot) {
+ oplogEntry.setOpTime(*slot);
+ }
+ logOperation(opCtx, &oplogEntry);
+}
+
+void OpObserverImpl::onCreateCollection(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const NamespaceString& collectionName,
+ const CollectionOptions& options,
+ const BSONObj& idIndex,
+ const OplogSlot& createOpTime,
+ bool fromMigrate) {
+ // do not replicate system.profile modifications
+ if (collectionName.isSystemDotProfile()) {
+ return;
+ }
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ const bool inMultiDocumentTransaction =
+ txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
+
+ if (inMultiDocumentTransaction) {
+ auto operation = MutableOplogEntry::makeCreateCommand(collectionName, options, idIndex);
+ txnParticipant.addTransactionOperation(opCtx, operation);
+ } else {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(collectionName.getCommandNS());
+ oplogEntry.setUuid(options.uuid);
+ oplogEntry.setObject(
+ MutableOplogEntry::makeCreateCollCmdObj(collectionName, options, idIndex));
+ oplogEntry.setOpTime(createOpTime);
+ oplogEntry.setFromMigrateIfTrue(fromMigrate);
+ logOperation(opCtx, &oplogEntry);
+ }
+}
+
+void OpObserverImpl::onCollMod(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ const BSONObj& collModCmd,
+ const CollectionOptions& oldCollOptions,
+ boost::optional<IndexCollModInfo> indexInfo) {
+
+ if (!nss.isSystemDotProfile()) {
+ // do not replicate system.profile modifications
+
+ // Create the 'o2' field object. We save the old collection metadata and TTL expiration.
+ BSONObjBuilder o2Builder;
+ o2Builder.append("collectionOptions_old", oldCollOptions.toBSON());
+ if (indexInfo) {
+ BSONObjBuilder oldIndexOptions;
+ if (indexInfo->oldExpireAfterSeconds) {
+ auto oldExpireAfterSeconds =
+ durationCount<Seconds>(indexInfo->oldExpireAfterSeconds.get());
+ oldIndexOptions.append("expireAfterSeconds", oldExpireAfterSeconds);
+ }
+ if (indexInfo->oldHidden) {
+ auto oldHidden = indexInfo->oldHidden.get();
+ oldIndexOptions.append("hidden", oldHidden);
+ }
+ if (indexInfo->oldPrepareUnique) {
+ auto oldPrepareUnique = indexInfo->oldPrepareUnique.get();
+ oldIndexOptions.append("prepareUnique", oldPrepareUnique);
+ }
+ o2Builder.append("indexOptions_old", oldIndexOptions.obj());
+ }
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(repl::makeCollModCmdObj(collModCmd, oldCollOptions, indexInfo));
+ oplogEntry.setObject2(o2Builder.done());
+ logOperation(opCtx, &oplogEntry);
+ }
+
+ // Make sure the UUID values in the Collection metadata, the Collection object, and the UUID
+ // catalog are all present and equal.
+ invariant(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_X));
+ auto databaseHolder = DatabaseHolder::get(opCtx);
+ auto db = databaseHolder->getDb(opCtx, nss.dbName());
+ // Some unit tests call the op observer on an unregistered Database.
+ if (!db) {
+ return;
+ }
+ const CollectionPtr& coll =
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
+
+ invariant(coll->uuid() == uuid);
+}
+
+void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& dbName) {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss({dbName, "$cmd"});
+ oplogEntry.setObject(BSON("dropDatabase" << 1));
+ logOperation(opCtx, &oplogEntry);
+
+ uassert(
+ 50714, "dropping the admin database is not allowed.", dbName != NamespaceString::kAdminDb);
+
+ if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) {
+ MongoDSessionCatalog::invalidateAllSessions(opCtx);
+ }
+
+ BucketCatalog::get(opCtx).clear(dbName);
+}
+
+repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
+ const NamespaceString& collectionName,
+ const UUID& uuid,
+ std::uint64_t numRecords,
+ CollectionDropType dropType) {
+ return onDropCollection(
+ opCtx, collectionName, uuid, numRecords, dropType, false /* markFromMigrate */);
+}
+
+repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
+ const NamespaceString& collectionName,
+ const UUID& uuid,
+ std::uint64_t numRecords,
+ const CollectionDropType dropType,
+ bool markFromMigrate) {
+ if (!collectionName.isSystemDotProfile()) {
+ // Do not replicate system.profile modifications.
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(collectionName.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setFromMigrateIfTrue(markFromMigrate);
+ oplogEntry.setObject(BSON("drop" << collectionName.coll()));
+ oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords));
+ logOperation(opCtx, &oplogEntry);
+ }
+
+ uassert(50715,
+ "dropping the server configuration collection (admin.system.version) is not allowed.",
+ collectionName != NamespaceString::kServerConfigurationNamespace);
+
+ if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) {
+ DurableViewCatalog::onSystemViewsCollectionDrop(opCtx, collectionName);
+ } else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) {
+ // Disallow this drop if there are currently prepared transactions.
+ const auto sessionCatalog = SessionCatalog::get(opCtx);
+ SessionKiller::Matcher matcherAllSessions(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
+ bool noPreparedTxns = true;
+ sessionCatalog->scanSessions(matcherAllSessions, [&](const ObservableSession& session) {
+ auto txnParticipant = TransactionParticipant::get(session);
+ if (txnParticipant.transactionIsPrepared()) {
+ noPreparedTxns = false;
+ }
+ });
+ uassert(4852500,
+ "Unable to drop transactions table (config.transactions) while prepared "
+ "transactions are present.",
+ noPreparedTxns);
+
+ MongoDSessionCatalog::invalidateAllSessions(opCtx);
+ } else if (collectionName == NamespaceString::kConfigSettingsNamespace) {
+ ReadWriteConcernDefaults::get(opCtx).invalidate();
+ } else if (collectionName.isTimeseriesBucketsCollection()) {
+ BucketCatalog::get(opCtx).clear(collectionName.getTimeseriesViewNamespace());
+ }
+
+ return {};
+}
+
+void OpObserverImpl::onDropIndex(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ const std::string& indexName,
+ const BSONObj& indexInfo) {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(BSON("dropIndexes" << nss.coll() << "index" << indexName));
+ oplogEntry.setObject2(indexInfo);
+ logOperation(opCtx, &oplogEntry);
+}
+
+repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx,
+ const NamespaceString& fromCollection,
+ const NamespaceString& toCollection,
+ const UUID& uuid,
+ const boost::optional<UUID>& dropTargetUUID,
+ std::uint64_t numRecords,
+ bool stayTemp) {
+ return preRenameCollection(opCtx,
+ fromCollection,
+ toCollection,
+ uuid,
+ dropTargetUUID,
+ numRecords,
+ stayTemp,
+ false /* markFromMigrate */);
+}
+
+repl::OpTime OpObserverImpl::preRenameCollection(OperationContext* const opCtx,
+ const NamespaceString& fromCollection,
+ const NamespaceString& toCollection,
+ const UUID& uuid,
+ const boost::optional<UUID>& dropTargetUUID,
+ std::uint64_t numRecords,
+ bool stayTemp,
+ bool markFromMigrate) {
+ BSONObjBuilder builder;
+ builder.append("renameCollection", fromCollection.ns());
+ builder.append("to", toCollection.ns());
+ builder.append("stayTemp", stayTemp);
+ if (dropTargetUUID) {
+ dropTargetUUID->appendToBuilder(&builder, "dropTarget");
+ }
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(fromCollection.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setFromMigrateIfTrue(markFromMigrate);
+ oplogEntry.setObject(builder.done());
+ if (dropTargetUUID)
+ oplogEntry.setObject2(makeObject2ForDropOrRename(numRecords));
+ logOperation(opCtx, &oplogEntry);
+
+ return {};
+}
+
+void OpObserverImpl::postRenameCollection(OperationContext* const opCtx,
+ const NamespaceString& fromCollection,
+ const NamespaceString& toCollection,
+ const UUID& uuid,
+ const boost::optional<UUID>& dropTargetUUID,
+ bool stayTemp) {
+ if (fromCollection.isSystemDotViews())
+ DurableViewCatalog::onExternalChange(opCtx, fromCollection);
+ if (toCollection.isSystemDotViews())
+ DurableViewCatalog::onExternalChange(opCtx, toCollection);
+}
+
+void OpObserverImpl::onRenameCollection(OperationContext* const opCtx,
+ const NamespaceString& fromCollection,
+ const NamespaceString& toCollection,
+ const UUID& uuid,
+ const boost::optional<UUID>& dropTargetUUID,
+ std::uint64_t numRecords,
+ bool stayTemp) {
+ onRenameCollection(opCtx,
+ fromCollection,
+ toCollection,
+ uuid,
+ dropTargetUUID,
+ numRecords,
+ stayTemp,
+ false /* markFromMigrate */);
+}
+
+void OpObserverImpl::onRenameCollection(OperationContext* const opCtx,
+ const NamespaceString& fromCollection,
+ const NamespaceString& toCollection,
+ const UUID& uuid,
+ const boost::optional<UUID>& dropTargetUUID,
+ std::uint64_t numRecords,
+ bool stayTemp,
+ bool markFromMigrate) {
+ preRenameCollection(opCtx,
+ fromCollection,
+ toCollection,
+ uuid,
+ dropTargetUUID,
+ numRecords,
+ stayTemp,
+ markFromMigrate);
+ postRenameCollection(opCtx, fromCollection, toCollection, uuid, dropTargetUUID, stayTemp);
+}
+
+void OpObserverImpl::onImportCollection(OperationContext* opCtx,
+ const UUID& importUUID,
+ const NamespaceString& nss,
+ long long numRecords,
+ long long dataSize,
+ const BSONObj& catalogEntry,
+ const BSONObj& storageMetadata,
+ bool isDryRun) {
+ ImportCollectionOplogEntry importCollection(
+ nss, importUUID, numRecords, dataSize, catalogEntry, storageMetadata, isDryRun);
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(nss.getCommandNS());
+ oplogEntry.setObject(importCollection.toBSON());
+ logOperation(opCtx, &oplogEntry);
+}
+
+void OpObserverImpl::onApplyOps(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& applyOpCmd) {
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss({dbName, "$cmd"});
+ oplogEntry.setObject(applyOpCmd);
+ logOperation(opCtx, &oplogEntry);
+}
+
+void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
+ const NamespaceString& collectionName,
+ const UUID& uuid) {
+ if (!collectionName.isSystemDotProfile()) {
+ // Do not replicate system.profile modifications
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(collectionName.getCommandNS());
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(BSON("emptycapped" << collectionName.coll()));
+ logOperation(opCtx, &oplogEntry);
+ }
+}
+
+namespace {
+
+/**
+ * Writes pre-images for update/replace/delete operations packed into a single "applyOps" entry to
+ * the change stream pre-images collection if required. The operations are defined by sequence
+ * ['stmtBegin', 'stmtEnd'). 'applyOpsTimestamp' and 'operationTime' are the timestamp and the wall
+ * clock time, respectively, of the "applyOps" entry. A pre-image is recorded for an operation only
+ * if pre-images are enabled for the collection the operation is issued on.
+ */
+void writeChangeStreamPreImagesForApplyOpsEntries(
+ OperationContext* opCtx,
+ std::vector<repl::ReplOperation>::const_iterator stmtBegin,
+ std::vector<repl::ReplOperation>::const_iterator stmtEnd,
+ Timestamp applyOpsTimestamp,
+ Date_t operationTime) {
+ int64_t applyOpsIndex{0};
+ for (auto stmtIterator = stmtBegin; stmtIterator != stmtEnd; ++stmtIterator) {
+ auto& operation = *stmtIterator;
+ if (operation.isChangeStreamPreImageRecordedInPreImagesCollection() &&
+ !operation.getNss().isTemporaryReshardingCollection()) {
+ invariant(operation.getUuid());
+ invariant(!operation.getPreImage().isEmpty());
+ writeToChangeStreamPreImagesCollection(
+ opCtx,
+ ChangeStreamPreImage{
+ ChangeStreamPreImageId{*operation.getUuid(), applyOpsTimestamp, applyOpsIndex},
+ operationTime,
+ operation.getPreImage()});
+ }
+ ++applyOpsIndex;
+ }
+}
+
+/**
+ * Returns operations that can fit into an "applyOps" entry. The returned operations are
+ * serialized to BSON. The operations are given by range ['operationsBegin',
+ * 'operationsEnd').
+ * Multi-document transactions follow the following constraints for fitting the operations: (1) the
+ * resulting "applyOps" entry shouldn't exceed the 16MB limit, unless only one operation is
+ * allocated to it; (2) the number of operations is not larger than the maximum number of
+ * transaction statements allowed in one entry as defined by
+ * 'gMaxNumberOfTransactionOperationsInSingleOplogEntry'. Batched writes (WUOWs that pack writes
+ * into a single applyOps outside of a multi-doc transaction) are exempt from the constraints above.
+ * If the operations cannot be packed into a single applyOps that's within the BSON size limit
+ * (16MB), the batched write will fail with TransactionTooLarge.
+ */
+std::vector<BSONObj> packOperationsIntoApplyOps(
+ OperationContext* opCtx,
+ std::vector<repl::ReplOperation>::const_iterator operationsBegin,
+ std::vector<repl::ReplOperation>::const_iterator operationsEnd) {
+ // Conservative BSON array element overhead assuming maximum 6 digit array index.
+ constexpr size_t kBSONArrayElementOverhead{8};
+ tassert(6278503,
+ "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number",
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0);
+ std::vector<BSONObj> operations;
+ size_t totalOperationsSize{0};
+ for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) {
+ const auto& operation = *operationIter;
+
+ if (TransactionParticipant::get(opCtx)) {
+ // Stop packing when either number of transaction operations is reached, or when the
+ // next one would make the total size of operations larger than the maximum BSON Object
+ // User Size. We rely on the headroom between BSONObjMaxUserSize and
+ // BSONObjMaxInternalSize to cover the BSON overhead and the other "applyOps" entry
+ // fields. But if a single operation in the set exceeds BSONObjMaxUserSize, we still fit
+ // it, as a single max-length operation should be able to be packed into an "applyOps"
+ // entry.
+ if (operations.size() ==
+ static_cast<size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry) ||
+ (operations.size() > 0 &&
+ (totalOperationsSize + DurableOplogEntry::getDurableReplOperationSize(operation) >
+ BSONObjMaxUserSize))) {
+ break;
+ }
+ } else {
+ // This a batched write, so we don't break the batch into multiple applyOps. It is the
+ // reponsibility of the caller to generate a batch that fits within a single applyOps.
+ // If the batch doesn't fit within an applyOps, we throw a TransactionTooLarge later
+ // on when serializing to BSON.
+ }
+ auto serializedOperation = operation.toBSON();
+ totalOperationsSize += static_cast<size_t>(serializedOperation.objsize());
+
+ // Add BSON array element overhead since operations will ultimately be packed into BSON
+ // array.
+ totalOperationsSize += kBSONArrayElementOverhead;
+
+ operations.emplace_back(std::move(serializedOperation));
+ }
+ return operations;
+}
+
+/**
+ * Returns oplog slots to be used for "applyOps" oplog entries, BSON serialized operations, their
+ * assignments to "applyOps" entries, and oplog slots to be used for writing pre- and post- image
+ * oplog entries for the transaction consisting of 'operations'. Allocates oplog slots from
+ * 'oplogSlots'. The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a
+ * pre-image to write as a noop oplog entry. The 'prepare' indicates if the function is called when
+ * preparing a transaction.
+ */
+OpObserver::ApplyOpsOplogSlotAndOperationAssignment
+getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& oplogSlots,
+ size_t numberOfPrePostImagesToWrite,
+ bool prepare,
+ std::vector<repl::ReplOperation>& operations) {
+ if (operations.empty()) {
+ return {{}, {}, 0 /*numberOfOplogSlotsUsed*/};
+ }
+ tassert(6278504, "Insufficient number of oplogSlots", operations.size() <= oplogSlots.size());
+
+ std::vector<OplogSlot> prePostImageOplogEntryOplogSlots;
+ std::vector<OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry> applyOpsEntries;
+ const auto operationCount = operations.size();
+ auto oplogSlotIter = oplogSlots.begin();
+ auto getNextOplogSlot = [&]() {
+ tassert(6278505, "Unexpected end of oplog slot vector", oplogSlotIter != oplogSlots.end());
+ return *oplogSlotIter++;
+ };
+
+ auto isMigratingTenant = [&opCtx]() {
+ return static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx));
+ };
+
+ // We never want to store pre-images or post-images when we're migrating oplog entries from
+ // another replica set.
+ if (numberOfPrePostImagesToWrite > 0 && !isMigratingTenant()) {
+ for (size_t operationIdx = 0; operationIdx < operationCount; ++operationIdx) {
+ auto& statement = operations[operationIdx];
+ if (statement.isChangeStreamPreImageRecordedInOplog() ||
+ (statement.isPreImageRecordedForRetryableInternalTransaction() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) {
+ tassert(6278506, "Expected a pre-image", !statement.getPreImage().isEmpty());
+ auto oplogSlot = getNextOplogSlot();
+ prePostImageOplogEntryOplogSlots.push_back(oplogSlot);
+ statement.setPreImageOpTime(oplogSlot);
+ }
+ if (!statement.getPostImage().isEmpty() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) {
+ auto oplogSlot = getNextOplogSlot();
+ prePostImageOplogEntryOplogSlots.push_back(oplogSlot);
+ statement.setPostImageOpTime(oplogSlot);
+ }
+ }
+ }
+
+ auto hasNeedsRetryImage = [](const repl::ReplOperation& operation) {
+ return static_cast<bool>(operation.getNeedsRetryImage());
+ };
+
+ // Assign operations to "applyOps" entries.
+ for (auto operationIt = operations.begin(); operationIt != operations.end();) {
+ auto applyOpsOperations = packOperationsIntoApplyOps(opCtx, operationIt, operations.end());
+ const auto opCountWithNeedsRetryImage =
+ std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage);
+ if (opCountWithNeedsRetryImage > 0) {
+ // Reserve a slot for a forged no-op entry.
+ getNextOplogSlot();
+ }
+ operationIt += applyOpsOperations.size();
+ applyOpsEntries.emplace_back(
+ OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry{
+ getNextOplogSlot(), std::move(applyOpsOperations)});
+ }
+
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ tassert(6501800,
+ "batched writes must generate a single applyOps entry",
+ !batchedWriteContext.writesAreBatched() || applyOpsEntries.size() == 1);
+
+ // In the special case of writing the implicit 'prepare' oplog entry, we use the last reserved
+ // oplog slot. This may mean we skipped over some reserved slots, but there's no harm in that.
+ if (prepare) {
+ applyOpsEntries.back().oplogSlot = oplogSlots.back();
+ }
+ return {std::move(prePostImageOplogEntryOplogSlots),
+ std::move(applyOpsEntries),
+ static_cast<size_t>(oplogSlotIter - oplogSlots.begin())};
+}
+
+/**
+ * Writes change stream pre-images for transaction 'operations'. The 'applyOpsOperationAssignment'
+ * contains a representation of "applyOps" entries to be written for the transaction. The
+ * 'operationTime' is wall clock time of the operations used for the pre-image documents.
+ */
+void writeChangeStreamPreImagesForTransaction(
+ OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& operations,
+ const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment,
+ Date_t operationTime) {
+ // This function must be called from an outer WriteUnitOfWork in order to be rolled back upon
+ // reaching the exception.
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ auto applyOpsEntriesIt = applyOpsOperationAssignment.applyOpsEntries.begin();
+ for (auto operationIter = operations.begin(); operationIter != operations.end();) {
+ tassert(6278507,
+ "Unexpected end of applyOps entries vector",
+ applyOpsEntriesIt != applyOpsOperationAssignment.applyOpsEntries.end());
+ const auto& applyOpsEntry = *applyOpsEntriesIt++;
+ const auto operationSequenceEnd = operationIter + applyOpsEntry.operations.size();
+ writeChangeStreamPreImagesForApplyOpsEntries(opCtx,
+ operationIter,
+ operationSequenceEnd,
+ applyOpsEntry.oplogSlot.getTimestamp(),
+ operationTime);
+ operationIter = operationSequenceEnd;
+ }
+}
+
+// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
+// field (and their corresponding statement ids to 'stmtIdsWritten'). The transaction statements are
+// represented as range ['stmtBegin', 'stmtEnd') and BSON serialized objects 'operations'. If any of
+// the statements has a pre-image or post-image that needs to be stored in the image collection,
+// stores it to 'imageToWrite'.
+void packTransactionStatementsForApplyOps(
+ BSONObjBuilder* applyOpsBuilder,
+ std::vector<StmtId>* stmtIdsWritten,
+ boost::optional<std::pair<repl::RetryImageEnum, BSONObj>>* imageToWrite,
+ std::vector<repl::ReplOperation>::iterator stmtBegin,
+ std::vector<repl::ReplOperation>::iterator stmtEnd,
+ const std::vector<BSONObj>& operations) {
+ tassert(6278508,
+ "Number of operations does not match the number of transaction statements",
+ operations.size() == static_cast<size_t>(stmtEnd - stmtBegin));
+ auto setImageToWrite = [&](const repl::ReplOperation& stmt) {
+ uassert(6054001,
+ str::stream() << NamespaceString::kConfigImagesNamespace
+ << " can only store the pre or post image of one "
+ "findAndModify operation for each "
+ "transaction",
+ !(*imageToWrite));
+ switch (*stmt.getNeedsRetryImage()) {
+ case repl::RetryImageEnum::kPreImage: {
+ invariant(!stmt.getPreImage().isEmpty());
+ *imageToWrite = std::make_pair(repl::RetryImageEnum::kPreImage, stmt.getPreImage());
+ break;
+ }
+ case repl::RetryImageEnum::kPostImage: {
+ invariant(!stmt.getPostImage().isEmpty());
+ *imageToWrite =
+ std::make_pair(repl::RetryImageEnum::kPostImage, stmt.getPostImage());
+ break;
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
+ };
+
+ std::vector<repl::ReplOperation>::iterator stmtIter;
+ auto operationsIter = operations.begin();
+ BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd));
+ for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) {
+ const auto& stmt = *stmtIter;
+ opsArray.append(*operationsIter++);
+ const auto stmtIds = stmt.getStatementIds();
+ stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end());
+ if (stmt.getNeedsRetryImage()) {
+ setImageToWrite(stmt);
+ }
+ }
+ try {
+ // BSONArrayBuilder will throw a BSONObjectTooLarge exception if we exceeded the max BSON
+ // size.
+ opsArray.done();
+ } catch (const AssertionException& e) {
+ // Change the error code to TransactionTooLarge if it is BSONObjectTooLarge.
+ uassert(ErrorCodes::TransactionTooLarge,
+ e.reason(),
+ e.code() != ErrorCodes::BSONObjectTooLarge);
+ throw;
+ }
+}
+
+// Logs one applyOps entry on a prepared transaction, or an unprepared transaction's commit, or on
+// committing a WUOW that is not necessarily tied to a multi-document transaction. It may update the
+// transactions table on multi-document transactions. Assumes that the given BSON builder object
+// already has an 'applyOps' field appended pointing to the desired array of ops i.e. { "applyOps"
+// : [op1, op2, ...] }
+//
+// @param txnState the 'state' field of the transaction table entry update. @param startOpTime the
+// optime of the 'startOpTime' field of the transaction table entry update. If boost::none, no
+// 'startOpTime' field will be included in the new transaction table entry. Only meaningful if
+// 'updateTxnTable' is true. @param updateTxnTable determines whether the transactions table will
+// updated after the oplog entry is written.
+//
+// Returns the optime of the written oplog entry.
+OpTimeBundle logApplyOps(OperationContext* opCtx,
+ MutableOplogEntry* oplogEntry,
+ boost::optional<DurableTxnStateEnum> txnState,
+ boost::optional<repl::OpTime> startOpTime,
+ std::vector<StmtId> stmtIdsWritten,
+ const bool updateTxnTable) {
+ if (!stmtIdsWritten.empty()) {
+ invariant(isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()));
+ }
+
+ const auto txnRetryCounter = opCtx->getTxnRetryCounter();
+
+ invariant(bool(txnRetryCounter) == bool(TransactionParticipant::get(opCtx)));
+
+ oplogEntry->setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry->setNss({"admin", "$cmd"});
+ // Batched writes (that is, WUOWs with 'groupOplogEntries') are not associated with a txnNumber,
+ // so do not emit an lsid either.
+ oplogEntry->setSessionId(opCtx->getTxnNumber() ? opCtx->getLogicalSessionId() : boost::none);
+ oplogEntry->setTxnNumber(opCtx->getTxnNumber());
+ if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) {
+ oplogEntry->getOperationSessionInfo().setTxnRetryCounter(*txnRetryCounter);
+ }
+
+ try {
+ OpTimeBundle times;
+ times.writeOpTime = logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/);
+ times.wallClockTime = oplogEntry->getWallClockTime();
+ if (updateTxnTable) {
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setLastWriteOpTime(times.writeOpTime);
+ sessionTxnRecord.setLastWriteDate(times.wallClockTime);
+ sessionTxnRecord.setState(txnState);
+ sessionTxnRecord.setStartOpTime(startOpTime);
+ if (txnRetryCounter && !isDefaultTxnRetryCounter(*txnRetryCounter)) {
+ sessionTxnRecord.setTxnRetryCounter(*txnRetryCounter);
+ }
+ onWriteOpCompleted(opCtx, std::move(stmtIdsWritten), sessionTxnRecord);
+ }
+ return times;
+ } catch (const AssertionException& e) {
+ // Change the error code to TransactionTooLarge if it is BSONObjectTooLarge.
+ uassert(ErrorCodes::TransactionTooLarge,
+ e.reason(),
+ e.code() != ErrorCodes::BSONObjectTooLarge);
+ throw;
+ }
+ MONGO_UNREACHABLE;
+}
+
+// Logs applyOps oplog entries for preparing a transaction, committing an unprepared
+// transaction, or committing a WUOW that is not necessarily related to a multi-document
+// transaction. This includes the in-progress 'partialTxn' oplog entries followed by the implicit
+// prepare or commit entry. If the 'prepare' argument is true, it will log entries for a prepared
+// transaction. Otherwise, it logs entries for an unprepared transaction. The total number of oplog
+// entries written will be <= the size of the given 'stmts' vector, and will depend on how many
+// transaction statements are given, the data size of each statement, and the
+// 'maxNumberOfTransactionOperationsInSingleOplogEntry' server parameter.
+//
+// This function expects that the size of 'oplogSlots' be at least as big as the size of 'stmts' in
+// the worst case, where each operation requires an applyOps entry of its own. If there are more
+// oplog slots than applyOps operations are written, the number of oplog slots corresponding to the
+// number of applyOps written will be used. It also expects that the vector of given statements is
+// non-empty.
+//
+// The 'applyOpsOperationAssignment' contains BSON serialized transaction statements, their
+// assigment to "applyOps" oplog entries, and oplog slots to be used for writing pre- and post-
+// image oplog entries for a transaction.
+//
+// In the case of writing entries for a prepared transaction, the last oplog entry (i.e. the
+// implicit prepare) will always be written using the last oplog slot given, even if this means
+// skipping over some reserved slots.
+//
+// The number of oplog entries written is returned.
+int logOplogEntries(
+ OperationContext* opCtx,
+ std::vector<repl::ReplOperation>* stmts,
+ const std::vector<OplogSlot>& oplogSlots,
+ const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment,
+ boost::optional<ImageBundle>* prePostImageToWriteToImageCollection,
+ size_t numberOfPrePostImagesToWrite,
+ bool prepare,
+ Date_t wallClockTime) {
+ invariant(!stmts->empty());
+
+ // Storage transaction commit is the last place inside a transaction that can throw an
+ // exception. In order to safely allow exceptions to be thrown at that point, this function must
+ // be called from an outer WriteUnitOfWork in order to be rolled back upon reaching the
+ // exception.
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ OpTimeBundle prevWriteOpTime;
+
+ // Writes to the oplog only require a Global intent lock. Guaranteed by
+ // OplogSlotReserver.
+ invariant(opCtx->lockState()->isWriteLocked());
+
+ if (txnParticipant) {
+ prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime();
+ }
+ auto currPrePostImageOplogEntryOplogSlot =
+ applyOpsOperationAssignment.prePostImageOplogEntryOplogSlots.begin();
+
+ // We never want to store pre-images or post-images when we're migrating oplog entries from
+ // another replica set.
+ const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx);
+
+ auto logPrePostImageNoopEntry = [&](const repl::ReplOperation& statement,
+ const BSONObj& imageDoc) {
+ auto slot = *currPrePostImageOplogEntryOplogSlot;
+ ++currPrePostImageOplogEntryOplogSlot;
+
+ MutableOplogEntry imageEntry;
+ imageEntry.setSessionId(*opCtx->getLogicalSessionId());
+ imageEntry.setTxnNumber(*opCtx->getTxnNumber());
+ imageEntry.setStatementIds(statement.getStatementIds());
+ imageEntry.setOpType(repl::OpTypeEnum::kNoop);
+ imageEntry.setObject(imageDoc);
+ imageEntry.setNss(statement.getNss());
+ imageEntry.setUuid(statement.getUuid());
+ imageEntry.setOpTime(slot);
+ imageEntry.setDestinedRecipient(statement.getDestinedRecipient());
+
+ logOperation(opCtx, &imageEntry);
+ };
+
+ if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) {
+ for (auto& statement : *stmts) {
+ if (statement.isChangeStreamPreImageRecordedInOplog() ||
+ (statement.isPreImageRecordedForRetryableInternalTransaction() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) {
+ invariant(!statement.getPreImage().isEmpty());
+
+ // Note that 'needsRetryImage' stores the image kind that needs to stored in the
+ // image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the
+ // pre-image will be written to the image collection (after all the applyOps oplog
+ // entries are written).
+ logPrePostImageNoopEntry(statement, statement.getPreImage());
+ }
+ if (!statement.getPostImage().isEmpty() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) {
+ // Likewise, when 'needsRetryImage' is equal to kPostImage, the post-image will be
+ // written to the image collection (after all the applyOps oplog entries are
+ // written).
+ logPrePostImageNoopEntry(statement, statement.getPostImage());
+ }
+ }
+ }
+
+ // Stores the statement ids of all write statements in the transaction.
+ std::vector<StmtId> stmtIdsWritten;
+
+ // At the beginning of each loop iteration below, 'stmtsIter' will always point to the
+ // first statement of the sequence of remaining, unpacked transaction statements. If all
+ // statements have been packed, it should point to stmts.end(), which is the loop's
+ // termination condition.
+ auto stmtsIter = stmts->begin();
+ auto applyOpsIter = applyOpsOperationAssignment.applyOpsEntries.begin();
+ while (stmtsIter != stmts->end()) {
+ tassert(6278509,
+ "Not enough \"applyOps\" entries",
+ applyOpsIter != applyOpsOperationAssignment.applyOpsEntries.end());
+ auto& applyOpsEntry = *applyOpsIter++;
+ BSONObjBuilder applyOpsBuilder;
+ boost::optional<std::pair<repl::RetryImageEnum, BSONObj>> imageToWrite;
+
+ const auto nextStmt = stmtsIter + applyOpsEntry.operations.size();
+ packTransactionStatementsForApplyOps(&applyOpsBuilder,
+ &stmtIdsWritten,
+ &imageToWrite,
+ stmtsIter,
+ nextStmt,
+ applyOpsEntry.operations);
+
+ // If we packed the last op, then the next oplog entry we log should be the implicit
+ // commit or implicit prepare, i.e. we omit the 'partialTxn' field.
+ auto firstOp = stmtsIter == stmts->begin();
+ auto lastOp = nextStmt == stmts->end();
+
+ auto implicitCommit = lastOp && !prepare;
+ auto implicitPrepare = lastOp && prepare;
+ auto isPartialTxn = !lastOp;
+
+ if (imageToWrite) {
+ uassert(6054002,
+ str::stream() << NamespaceString::kConfigImagesNamespace
+ << " can only store the pre or post image of one "
+ "findAndModify operation for each "
+ "transaction",
+ !(*prePostImageToWriteToImageCollection));
+ }
+
+ if (isPartialTxn || (imageToWrite && !prepare)) {
+ // Partial transactions and unprepared transactions with pre or post image stored in the
+ // image collection create/reserve multiple oplog entries in the same WriteUnitOfWork.
+ // Because of this, such transactions will set multiple timestamps, violating the
+ // multi timestamp constraint. It's safe to ignore the multi timestamp constraints here
+ // as additional rollback logic is in place for this case.
+ opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints();
+ }
+
+ // A 'prepare' oplog entry should never include a 'partialTxn' field.
+ invariant(!(isPartialTxn && implicitPrepare));
+ if (implicitPrepare) {
+ applyOpsBuilder.append("prepare", true);
+ }
+ if (isPartialTxn) {
+ applyOpsBuilder.append("partialTxn", true);
+ }
+
+ // The 'count' field gives the total number of individual operations in the
+ // transaction, and is included on a non-initial implicit commit or prepare entry.
+ if (lastOp && !firstOp) {
+ applyOpsBuilder.append("count", static_cast<long long>(stmts->size()));
+ }
+
+ // For both prepared and unprepared transactions (but not for batched writes) update the
+ // transactions table on the first and last op.
+ auto updateTxnTable = txnParticipant && (firstOp || lastOp);
+
+ // The first optime of the transaction is always the first oplog slot, except in the
+ // case of a single prepare oplog entry.
+ auto firstOpTimeOfTxn =
+ (implicitPrepare && firstOp) ? oplogSlots.back() : oplogSlots.front();
+
+ // We always write the startOpTime field, which is the first optime of the
+ // transaction, except when transitioning to 'committed' state, in which it should
+ // no longer be set.
+ auto startOpTime = boost::make_optional(!implicitCommit, firstOpTimeOfTxn);
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(applyOpsEntry.oplogSlot);
+ if (txnParticipant) {
+ oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime);
+ }
+ oplogEntry.setWallClockTime(wallClockTime);
+ oplogEntry.setObject(applyOpsBuilder.done());
+ auto txnState = isPartialTxn
+ ? DurableTxnStateEnum::kInProgress
+ : (implicitPrepare ? DurableTxnStateEnum::kPrepared : DurableTxnStateEnum::kCommitted);
+ prevWriteOpTime = logApplyOps(opCtx,
+ &oplogEntry,
+ txnState,
+ startOpTime,
+ (lastOp ? std::move(stmtIdsWritten) : std::vector<StmtId>{}),
+ updateTxnTable);
+
+ hangAfterLoggingApplyOpsForTransaction.pauseWhileSet();
+
+ if (imageToWrite) {
+ invariant(!(*prePostImageToWriteToImageCollection));
+ *prePostImageToWriteToImageCollection =
+ ImageBundle{imageToWrite->first,
+ imageToWrite->second,
+ prevWriteOpTime.writeOpTime.getTimestamp()};
+ }
+
+ // Advance the iterator to the beginning of the remaining unpacked statements.
+ stmtsIter = nextStmt;
+ }
+
+ return applyOpsOperationAssignment.numberOfOplogSlotsUsed;
+}
+
+void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
+ MutableOplogEntry* oplogEntry,
+ DurableTxnStateEnum durableState) {
+ const auto txnRetryCounter = *opCtx->getTxnRetryCounter();
+
+ oplogEntry->setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry->setNss({"admin", "$cmd"});
+ oplogEntry->setSessionId(opCtx->getLogicalSessionId());
+ oplogEntry->setTxnNumber(opCtx->getTxnNumber());
+ if (!isDefaultTxnRetryCounter(txnRetryCounter)) {
+ oplogEntry->getOperationSessionInfo().setTxnRetryCounter(txnRetryCounter);
+ }
+ oplogEntry->setPrevWriteOpTimeInTransaction(
+ TransactionParticipant::get(opCtx).getLastWriteOpTime());
+
+ // There should not be a parent WUOW outside of this one. This guarantees the safety of the
+ // write conflict retry loop.
+ invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+
+ // We must not have a maximum lock timeout, since writing the commit or abort oplog entry for a
+ // prepared transaction must always succeed.
+ invariant(!opCtx->lockState()->hasMaxLockTimeout());
+
+ writeConflictRetry(
+ opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ // Writes to the oplog only require a Global intent lock. Guaranteed by
+ // OplogSlotReserver.
+ invariant(opCtx->lockState()->isWriteLocked());
+
+ WriteUnitOfWork wuow(opCtx);
+ const auto oplogOpTime = logOperation(opCtx, oplogEntry);
+ invariant(oplogEntry->getOpTime().isNull() || oplogEntry->getOpTime() == oplogOpTime);
+
+ SessionTxnRecord sessionTxnRecord;
+ sessionTxnRecord.setLastWriteOpTime(oplogOpTime);
+ sessionTxnRecord.setLastWriteDate(oplogEntry->getWallClockTime());
+ sessionTxnRecord.setState(durableState);
+ if (!isDefaultTxnRetryCounter(txnRetryCounter)) {
+ sessionTxnRecord.setTxnRetryCounter(txnRetryCounter);
+ }
+ onWriteOpCompleted(opCtx, {}, sessionTxnRecord);
+ wuow.commit();
+ });
+}
+
+} // namespace
+
+void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
+ std::vector<repl::ReplOperation>* statements,
+ size_t numberOfPrePostImagesToWrite) {
+ invariant(opCtx->getTxnNumber());
+
+ if (!opCtx->writesAreReplicated()) {
+ return;
+ }
+
+ // It is possible that the transaction resulted in no changes. In that case, we should
+ // not write an empty applyOps entry.
+ if (statements->empty())
+ return;
+
+ repl::OpTime commitOpTime;
+ // Reserve all the optimes in advance, so we only need to get the optime mutex once. We
+ // reserve enough entries for all statements in the transaction.
+ auto oplogSlots =
+ repl::getNextOpTimes(opCtx, statements->size() + numberOfPrePostImagesToWrite);
+
+ // Throw TenantMigrationConflict error if the database for the transaction statements is being
+ // migrated. We only need check the namespace of the first statement since a transaction's
+ // statements must all be for the same tenant.
+ tenant_migration_access_blocker::checkIfCanWriteOrThrow(
+ opCtx, statements->begin()->getNss().db(), oplogSlots.back().getTimestamp());
+
+ if (MONGO_unlikely(hangAndFailUnpreparedCommitAfterReservingOplogSlot.shouldFail())) {
+ hangAndFailUnpreparedCommitAfterReservingOplogSlot.pauseWhileSet(opCtx);
+ uasserted(51268, "hangAndFailUnpreparedCommitAfterReservingOplogSlot fail point enabled");
+ }
+
+ // Serialize transaction statements to BSON and determine their assignment to "applyOps"
+ // entries.
+ const auto applyOpsOplogSlotAndOperationAssignment =
+ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ opCtx, oplogSlots, numberOfPrePostImagesToWrite, false /*prepare*/, *statements);
+ const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
+
+ // Log in-progress entries for the transaction along with the implicit commit.
+ boost::optional<ImageBundle> imageToWrite;
+ int numOplogEntries = logOplogEntries(opCtx,
+ statements,
+ oplogSlots,
+ applyOpsOplogSlotAndOperationAssignment,
+ &imageToWrite,
+ numberOfPrePostImagesToWrite,
+ false /* prepare*/,
+ wallClockTime);
+
+ // Write change stream pre-images. At this point the pre-images will be written at the
+ // transaction commit timestamp as driven (implicitly) by the last written "applyOps" oplog
+ // entry.
+ writeChangeStreamPreImagesForTransaction(
+ opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime);
+
+ if (imageToWrite) {
+ writeToImageCollection(opCtx,
+ *opCtx->getLogicalSessionId(),
+ imageToWrite->timestamp,
+ imageToWrite->imageKind,
+ imageToWrite->imageDoc);
+ }
+
+ commitOpTime = oplogSlots[numOplogEntries - 1];
+ invariant(!commitOpTime.isNull());
+ shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime);
+}
+
+void OpObserverImpl::onBatchedWriteStart(OperationContext* opCtx) {
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ batchedWriteContext.setWritesAreBatched(true);
+}
+
+void OpObserverImpl::onBatchedWriteCommit(OperationContext* opCtx) {
+ if (repl::ReplicationCoordinator::get(opCtx)->getReplicationMode() !=
+ repl::ReplicationCoordinator::modeReplSet ||
+ !opCtx->writesAreReplicated()) {
+ return;
+ }
+
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ auto& batchedOps = batchedWriteContext.getBatchedOperations(opCtx);
+
+ if (!batchedOps.size()) {
+ return;
+ }
+
+ // Reserve all the optimes in advance, so we only need to get the optime mutex once. We
+ // reserve enough entries for all statements in the transaction.
+ auto oplogSlots = repl::getNextOpTimes(opCtx, batchedOps.size());
+
+ // Throw TenantMigrationConflict error if the database for the transaction statements is being
+ // migrated. We only need check the namespace of the first statement since a transaction's
+ // statements must all be for the same tenant.
+ tenant_migration_access_blocker::checkIfCanWriteOrThrow(
+ opCtx, batchedOps.begin()->getNss().db(), oplogSlots.back().getTimestamp());
+
+ auto noPrePostImage = boost::optional<ImageBundle>(boost::none);
+
+ // Serialize batched statements to BSON and determine their assignment to "applyOps"
+ // entries.
+ const auto applyOpsOplogSlotAndOperationAssignment =
+ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ opCtx, oplogSlots, 0 /*numberOfPrePostImagesToWrite*/, false /*prepare*/, batchedOps);
+ const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
+ logOplogEntries(opCtx,
+ &batchedOps,
+ oplogSlots,
+ applyOpsOplogSlotAndOperationAssignment,
+ &noPrePostImage,
+ 0 /* numberOfPrePostImagesToWrite */,
+ false,
+ wallClockTime);
+}
+
+void OpObserverImpl::onBatchedWriteAbort(OperationContext* opCtx) {
+ auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
+ batchedWriteContext.clearBatchedOperations(opCtx);
+ batchedWriteContext.setWritesAreBatched(false);
+}
+
+void OpObserverImpl::onPreparedTransactionCommit(
+ OperationContext* opCtx,
+ OplogSlot commitOplogEntryOpTime,
+ Timestamp commitTimestamp,
+ const std::vector<repl::ReplOperation>& statements) noexcept {
+ invariant(opCtx->getTxnNumber());
+
+ if (!opCtx->writesAreReplicated()) {
+ return;
+ }
+
+ invariant(!commitTimestamp.isNull());
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(commitOplogEntryOpTime);
+
+ CommitTransactionOplogObject cmdObj;
+ cmdObj.setCommitTimestamp(commitTimestamp);
+ oplogEntry.setObject(cmdObj.toBSON());
+
+ logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted);
+}
+
+std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>
+OpObserverImpl::preTransactionPrepare(OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) {
+ auto applyOpsOplogSlotAndOperationAssignment =
+ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ opCtx, reservedSlots, numberOfPrePostImagesToWrite, true /*prepare*/, *statements);
+ writeChangeStreamPreImagesForTransaction(
+ opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime);
+ return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>(
+ std::move(applyOpsOplogSlotAndOperationAssignment));
+}
+
+void OpObserverImpl::onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) {
+ invariant(!reservedSlots.empty());
+ const auto prepareOpTime = reservedSlots.back();
+ invariant(opCtx->getTxnNumber());
+ invariant(!prepareOpTime.isNull());
+ tassert(6278510,
+ "Operation assignments to applyOps entries should be present",
+ applyOpsOperationAssignment);
+
+ // Don't write oplog entry on secondaries.
+ if (!opCtx->writesAreReplicated()) {
+ return;
+ }
+
+ {
+ // We should have reserved enough slots.
+ invariant(reservedSlots.size() >= statements->size());
+ TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
+
+ writeConflictRetry(
+ opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ // Writes to the oplog only require a Global intent lock. Guaranteed by
+ // OplogSlotReserver.
+ invariant(opCtx->lockState()->isWriteLocked());
+
+ WriteUnitOfWork wuow(opCtx);
+ // It is possible that the transaction resulted in no changes, In that case, we
+ // should not write any operations other than the prepare oplog entry.
+ if (!statements->empty()) {
+ // We had reserved enough oplog slots for the worst case where each operation
+ // produced one oplog entry. When operations are smaller and can be packed, we
+ // will waste the extra slots. The implicit prepare oplog entry will still use
+ // the last reserved slot, because the transaction participant has already used
+ // that as the prepare time.
+ boost::optional<ImageBundle> imageToWrite;
+ logOplogEntries(opCtx,
+ statements,
+ reservedSlots,
+ *applyOpsOperationAssignment,
+ &imageToWrite,
+ numberOfPrePostImagesToWrite,
+ true /* prepare */,
+ wallClockTime);
+ if (imageToWrite) {
+ writeToImageCollection(opCtx,
+ *opCtx->getLogicalSessionId(),
+ imageToWrite->timestamp,
+ imageToWrite->imageKind,
+ imageToWrite->imageDoc);
+ }
+ } else {
+ // Log an empty 'prepare' oplog entry.
+ // We need to have at least one reserved slot.
+ invariant(reservedSlots.size() > 0);
+ BSONObjBuilder applyOpsBuilder;
+ BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
+ opsArray.done();
+ applyOpsBuilder.append("prepare", true);
+
+ auto oplogSlot = reservedSlots.front();
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(oplogSlot);
+ oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
+ oplogEntry.setObject(applyOpsBuilder.done());
+ oplogEntry.setWallClockTime(wallClockTime);
+ logApplyOps(opCtx,
+ &oplogEntry,
+ DurableTxnStateEnum::kPrepared,
+ oplogSlot,
+ {},
+ true /* updateTxnTable */);
+ }
+ wuow.commit();
+ });
+ }
+
+ shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, prepareOpTime);
+}
+
+void OpObserverImpl::onTransactionAbort(OperationContext* opCtx,
+ boost::optional<OplogSlot> abortOplogEntryOpTime) {
+ invariant(opCtx->getTxnNumber());
+
+ if (!opCtx->writesAreReplicated()) {
+ return;
+ }
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ invariant(txnParticipant);
+
+ if (!abortOplogEntryOpTime) {
+ invariant(!txnParticipant.transactionIsCommitted());
+ return;
+ }
+
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpTime(*abortOplogEntryOpTime);
+
+ AbortTransactionOplogObject cmdObj;
+ oplogEntry.setObject(cmdObj.toBSON());
+
+ logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kAborted);
+}
+
+void OpObserverImpl::_onReplicationRollback(OperationContext* opCtx,
+ const RollbackObserverInfo& rbInfo) {
+ // Reset the key manager cache.
+ auto validator = LogicalTimeValidator::get(opCtx);
+ if (validator) {
+ validator->resetKeyManagerCache();
+ }
+
+ // Check if the shard identity document rolled back.
+ if (rbInfo.shardIdentityRolledBack) {
+ fassertFailedNoTrace(50712);
+ }
+
+ // Force the default read/write concern cache to reload on next access in case the defaults
+ // document was rolled back.
+ ReadWriteConcernDefaults::get(opCtx).invalidate();
+
+ stdx::unordered_set<NamespaceString> timeseriesNamespaces;
+ for (const auto& ns : rbInfo.rollbackNamespaces) {
+ if (ns.isTimeseriesBucketsCollection()) {
+ timeseriesNamespaces.insert(ns.getTimeseriesViewNamespace());
+ }
+ }
+ BucketCatalog::get(opCtx).clear([&timeseriesNamespaces](const NamespaceString& bucketNs) {
+ return timeseriesNamespaces.contains(bucketNs);
+ });
+}
+
+} // namespace mongo