summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorDaniel Gómez Ferro <daniel.gomezferro@mongodb.com>2022-10-28 11:13:54 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-28 12:43:13 +0000
commit7bfdaf25f35df2c329eae4150ee7c6599e15e5b0 (patch)
tree68e4fbdd7a1d6b5a6767ecb7ac4ecb7fd7d7b99c /src/mongo/db
parent819bdae0b239f0ad75c0791e18943e6c4cf9762d (diff)
downloadmongo-7bfdaf25f35df2c329eae4150ee7c6599e15e5b0.tar.gz
SERVER-70044 Thread-through CollectionPtr into the onUpdate OpObserver
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/auth/auth_op_observer.cpp4
-rw-r--r--src/mongo/db/catalog/collection.h34
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp207
-rw-r--r--src/mongo/db/catalog/collection_impl.h32
-rw-r--r--src/mongo/db/catalog/collection_mock.h21
-rw-r--r--src/mongo/db/catalog/collection_test.cpp21
-rw-r--r--src/mongo/db/catalog/collection_write_path.cpp221
-rw-r--r--src/mongo/db/catalog/collection_write_path.h34
-rw-r--r--src/mongo/db/catalog/views_for_database.cpp15
-rw-r--r--src/mongo/db/catalog/virtual_collection_impl.h23
-rw-r--r--src/mongo/db/dbhelpers.cpp4
-rw-r--r--src/mongo/db/exec/update_stage.cpp35
-rw-r--r--src/mongo/db/exec/write_stage_common.cpp7
-rw-r--r--src/mongo/db/exec/write_stage_common.h6
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.cpp2
-rw-r--r--src/mongo/db/op_observer/fcv_op_observer.cpp2
-rw-r--r--src/mongo/db/op_observer/op_observer.h7
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.cpp44
-rw-r--r--src/mongo/db/op_observer/op_observer_impl_test.cpp108
-rw-r--r--src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp4
-rw-r--r--src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp3
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp4
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/storage_timestamp_test.cpp9
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp16
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp2
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier_test.cpp4
-rw-r--r--src/mongo/db/s/README.md8
-rw-r--r--src/mongo/db/s/config_server_op_observer.cpp2
-rw-r--r--src/mongo/db/s/query_analysis_op_observer.cpp4
-rw-r--r--src/mongo/db/s/range_deleter_service_op_observer.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.cpp9
-rw-r--r--src/mongo/db/s/resharding/resharding_service_test_helpers.h2
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp14
-rw-r--r--src/mongo/db/serverless/SConscript1
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp2
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp6
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp17
-rw-r--r--src/mongo/db/transaction/transaction_participant.cpp32
42 files changed, 497 insertions, 477 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index a848fd32367..e00940fd651 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -992,6 +992,7 @@ env.Library(
'query_exec',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/collection_crud',
'$BUILD_DIR/mongo/db/commands/server_status_core',
'$BUILD_DIR/mongo/db/ops/write_ops',
'record_id_helpers',
diff --git a/src/mongo/db/auth/auth_op_observer.cpp b/src/mongo/db/auth/auth_op_observer.cpp
index b974eda9973..a2ddfa79949 100644
--- a/src/mongo/db/auth/auth_op_observer.cpp
+++ b/src/mongo/db/auth/auth_op_observer.cpp
@@ -67,10 +67,10 @@ void AuthOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
return;
}
- audit::logUpdateOperation(opCtx->getClient(), args.nss, args.updateArgs->updatedDoc);
+ audit::logUpdateOperation(opCtx->getClient(), args.coll->ns(), args.updateArgs->updatedDoc);
AuthorizationManager::get(opCtx->getServiceContext())
- ->logOp(opCtx, "u", args.nss, args.updateArgs->update, &args.updateArgs->criteria);
+ ->logOp(opCtx, "u", args.coll->ns(), args.updateArgs->update, &args.updateArgs->criteria);
}
void AuthOpObserver::aboutToDelete(OperationContext* opCtx,
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index f3f9997b93e..3fac44cff7a 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -85,6 +85,8 @@ struct CollectionUpdateArgs {
StoreDocOption storeDocOption = StoreDocOption::None;
bool changeStreamPreAndPostImagesEnabledForCollection = false;
+ bool retryableWrite = false;
+
// Set if OpTimes were reserved for the update ahead of time.
std::vector<OplogSlot> oplogSlots;
};
@@ -347,40 +349,8 @@ public:
StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off,
CheckRecordId checkRecordId = CheckRecordId::Off) const = 0;
- /**
- * Updates the document @ oldLocation with newDoc.
- *
- * If the document fits in the old space, it is put there; if not, it is moved.
- * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on
- * success.
- * 'opDebug' Optional argument. When not null, will be used to record operation statistics.
- * @return the post update location of the doc (may or may not be the same as oldLocation)
- */
- virtual RecordId updateDocument(OperationContext* opCtx,
- const RecordId& oldLocation,
- const Snapshotted<BSONObj>& oldDoc,
- const BSONObj& newDoc,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const = 0;
-
virtual bool updateWithDamagesSupported() const = 0;
- /**
- * Illegal to call if updateWithDamagesSupported() returns false.
- * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on
- * success.
- *
- * Returns the contents of the updated document on success.
- */
- virtual StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx,
- const RecordId& loc,
- const Snapshotted<BSONObj>& oldDoc,
- const char* damageSource,
- const mutablebson::DamageVector& damages,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const = 0;
// -----------
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 18928d12897..b4f3078a3c9 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -173,6 +173,7 @@ bool isRetryableWrite(OperationContext* opCtx) {
(!opCtx->inMultiDocumentTransaction() || txnParticipant.transactionIsOpen());
}
+// TODO(SERVER-70043) remove after that ticket is fixed
std::vector<OplogSlot> reserveOplogSlotsForRetryableFindAndModify(OperationContext* opCtx) {
invariant(isRetryableWrite(opCtx));
@@ -871,132 +872,6 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx,
}
}
-bool compareSafeContentElem(const BSONObj& oldDoc, const BSONObj& newDoc) {
- if (newDoc.hasField(kSafeContent) != oldDoc.hasField(kSafeContent)) {
- return false;
- }
- if (!newDoc.hasField(kSafeContent)) {
- return true;
- }
-
- return newDoc.getField(kSafeContent).binaryEqual(oldDoc.getField(kSafeContent));
-}
-
-RecordId CollectionImpl::updateDocument(OperationContext* opCtx,
- const RecordId& oldLocation,
- const Snapshotted<BSONObj>& oldDoc,
- const BSONObj& newDoc,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const {
- {
- auto status = checkValidationAndParseResult(opCtx, newDoc);
- if (!status.isOK()) {
- if (validationLevelOrDefault(_metadata->options.validationLevel) ==
- ValidationLevelEnum::strict) {
- uassertStatusOK(status);
- }
- // moderate means we have to check the old doc
- auto oldDocStatus = checkValidationAndParseResult(opCtx, oldDoc.value());
- if (oldDocStatus.isOK()) {
- // transitioning from good -> bad is not ok
- uassertStatusOK(status);
- }
- // bad -> bad is ok in moderate mode
- }
- }
-
- auto& validationSettings = DocumentValidationSettings::get(opCtx);
- if (getCollectionOptions().encryptedFieldConfig &&
- !validationSettings.isSchemaValidationDisabled() &&
- !validationSettings.isSafeContentValidationDisabled()) {
-
- uassert(ErrorCodes::BadValue,
- str::stream() << "New document and old document both need to have " << kSafeContent
- << " field.",
- compareSafeContentElem(oldDoc.value(), newDoc));
- }
-
- dassert(opCtx->lockState()->isCollectionLockedForMode(ns(), MODE_IX));
- invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId());
- invariant(newDoc.isOwned());
-
- if (needsCappedLock()) {
- Lock::ResourceLock heldUntilEndOfWUOW{opCtx, ResourceId(RESOURCE_METADATA, _ns), MODE_X};
- }
-
- SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId();
-
- BSONElement oldId = oldDoc.value()["_id"];
- if (!oldId.eoo() && SimpleBSONElementComparator::kInstance.evaluate(oldId != newDoc["_id"]))
- uasserted(13596, "in Collection::updateDocument _id mismatch");
-
- // The preImageDoc may not be boost::none if this update was a retryable findAndModify or if
- // the update may have changed the shard key. For non-in-place updates we always set the
- // preImageDoc here to an owned copy of the pre-image.
- if (!args->preImageDoc) {
- args->preImageDoc = oldDoc.value().getOwned();
- }
- args->changeStreamPreAndPostImagesEnabledForCollection =
- isChangeStreamPreAndPostImagesEnabled();
-
- OplogUpdateEntryArgs onUpdateArgs(args, ns(), _uuid);
- const bool setNeedsRetryImageOplogField =
- args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None;
- if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && isRetryableWrite(opCtx)) {
- onUpdateArgs.retryableFindAndModifyLocation =
- RetryableFindAndModifyLocation::kSideCollection;
- // If the update is part of a retryable write and we expect to be storing the pre- or
- // post-image in a side collection, then we must reserve oplog slots in advance. We
- // expect to use the reserved oplog slots as follows, where TS is the greatest
- // timestamp of 'oplogSlots':
- // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set
- // the entry timestamps to TS - 1.
- // TS: The timestamp given to the update oplog entry.
- args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx);
- } else {
- // Retryable findAndModify commands should not reserve oplog slots before entering this
- // function since tenant migrations and resharding rely on always being able to set
- // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1.
- invariant(!(isRetryableWrite(opCtx) && setNeedsRetryImageOplogField));
- }
-
- uassertStatusOK(_shared->_recordStore->updateRecord(
- opCtx, oldLocation, newDoc.objdata(), newDoc.objsize()));
-
- if (indexesAffected) {
- int64_t keysInserted = 0;
- int64_t keysDeleted = 0;
-
- uassertStatusOK(_indexCatalog->updateRecord(opCtx,
- {this, CollectionPtr::NoYieldTag{}},
- *args->preImageDoc,
- newDoc,
- oldLocation,
- &keysInserted,
- &keysDeleted));
-
- if (opDebug) {
- opDebug->additiveMetrics.incrementKeysInserted(keysInserted);
- opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted);
- // 'opDebug' may be deleted at rollback time in case of multi-document transaction.
- if (!opCtx->inMultiDocumentTransaction()) {
- opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() {
- opDebug->additiveMetrics.incrementKeysInserted(-keysInserted);
- opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted);
- });
- }
- }
- }
-
- invariant(sid == opCtx->recoveryUnit()->getSnapshotId());
- args->updatedDoc = newDoc;
-
- opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs);
-
- return oldLocation;
-}
-
bool CollectionImpl::updateWithDamagesSupported() const {
if (!_validator.isOK() || _validator.filter.getValue() != nullptr)
return false;
@@ -1004,86 +879,6 @@ bool CollectionImpl::updateWithDamagesSupported() const {
return _shared->_recordStore->updateWithDamagesSupported();
}
-StatusWith<BSONObj> CollectionImpl::updateDocumentWithDamages(
- OperationContext* opCtx,
- const RecordId& loc,
- const Snapshotted<BSONObj>& oldDoc,
- const char* damageSource,
- const mutablebson::DamageVector& damages,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const {
- dassert(opCtx->lockState()->isCollectionLockedForMode(ns(), MODE_IX));
- invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId());
- invariant(updateWithDamagesSupported());
-
- // For in-place updates we need to grab an owned copy of the pre-image doc if pre-image
- // recording is enabled and we haven't already set the pre-image due to this update being
- // a retryable findAndModify or a possible update to the shard key.
- if (!args->preImageDoc && isChangeStreamPreAndPostImagesEnabled()) {
- args->preImageDoc = oldDoc.value().getOwned();
- }
- OplogUpdateEntryArgs onUpdateArgs(args, ns(), _uuid);
- const bool setNeedsRetryImageOplogField =
- args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None;
- if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && isRetryableWrite(opCtx)) {
- onUpdateArgs.retryableFindAndModifyLocation =
- RetryableFindAndModifyLocation::kSideCollection;
- // If the update is part of a retryable write and we expect to be storing the pre- or
- // post-image in a side collection, then we must reserve oplog slots in advance. We
- // expect to use the reserved oplog slots as follows, where TS is the greatest
- // timestamp of 'oplogSlots':
- // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set
- // the entry timestamps to TS - 1.
- // TS: The timestamp given to the update oplog entry.
- args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx);
- } else {
- // Retryable findAndModify commands should not reserve oplog slots before entering this
- // function since tenant migrations and resharding rely on always being able to set
- // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1.
- invariant(!(isRetryableWrite(opCtx) && setNeedsRetryImageOplogField));
- }
-
- RecordData oldRecordData(oldDoc.value().objdata(), oldDoc.value().objsize());
- StatusWith<RecordData> recordData =
- _shared->_recordStore->updateWithDamages(opCtx, loc, oldRecordData, damageSource, damages);
- if (!recordData.isOK())
- return recordData.getStatus();
- BSONObj newDoc = std::move(recordData.getValue()).releaseToBson().getOwned();
-
- args->updatedDoc = newDoc;
- args->changeStreamPreAndPostImagesEnabledForCollection =
- isChangeStreamPreAndPostImagesEnabled();
-
- if (indexesAffected) {
- int64_t keysInserted = 0;
- int64_t keysDeleted = 0;
-
- uassertStatusOK(_indexCatalog->updateRecord(opCtx,
- {this, CollectionPtr::NoYieldTag{}},
- oldDoc.value(),
- args->updatedDoc,
- loc,
- &keysInserted,
- &keysDeleted));
-
- if (opDebug) {
- opDebug->additiveMetrics.incrementKeysInserted(keysInserted);
- opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted);
- // 'opDebug' may be deleted at rollback time in case of multi-document transaction.
- if (!opCtx->inMultiDocumentTransaction()) {
- opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() {
- opDebug->additiveMetrics.incrementKeysInserted(-keysInserted);
- opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted);
- });
- }
- }
- }
-
- opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs);
- return newDoc;
-}
-
bool CollectionImpl::isTemporary() const {
return _metadata->options.temp;
}
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index 409d570c8c6..46c2a3bb6be 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -191,40 +191,8 @@ public:
Collection::StoreDeletedDoc storeDeletedDoc = Collection::StoreDeletedDoc::Off,
CheckRecordId checkRecordId = CheckRecordId::Off) const final;
- /**
- * Updates the document @ oldLocation with newDoc.
- *
- * If the document fits in the old space, it is put there; if not, it is moved.
- * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on
- * success.
- * 'opDebug' Optional argument. When not null, will be used to record operation statistics.
- * @return the post update location of the doc (may or may not be the same as oldLocation)
- */
- RecordId updateDocument(OperationContext* opCtx,
- const RecordId& oldLocation,
- const Snapshotted<BSONObj>& oldDoc,
- const BSONObj& newDoc,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const final;
-
bool updateWithDamagesSupported() const final;
- /**
- * Illegal to call if updateWithDamagesSupported() returns false.
- * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on
- * success.
- * Returns the contents of the updated document.
- */
- StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx,
- const RecordId& loc,
- const Snapshotted<BSONObj>& oldDoc,
- const char* damageSource,
- const mutablebson::DamageVector& damages,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const final;
-
// -----------
/**
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index 62d5bf82df6..7ff9e7d00d8 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -160,31 +160,10 @@ public:
MONGO_UNREACHABLE;
}
- RecordId updateDocument(OperationContext* opCtx,
- const RecordId& oldLocation,
- const Snapshotted<BSONObj>& oldDoc,
- const BSONObj& newDoc,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const {
- MONGO_UNREACHABLE;
- }
-
bool updateWithDamagesSupported() const {
MONGO_UNREACHABLE;
}
- StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx,
- const RecordId& loc,
- const Snapshotted<BSONObj>& oldDoc,
- const char* damageSource,
- const mutablebson::DamageVector& damages,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const {
- MONGO_UNREACHABLE;
- }
-
Status truncate(OperationContext* opCtx) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/catalog/collection_test.cpp b/src/mongo/db/catalog/collection_test.cpp
index 5957c74b53c..f03632ceaab 100644
--- a/src/mongo/db/catalog/collection_test.cpp
+++ b/src/mongo/db/catalog/collection_test.cpp
@@ -278,7 +278,8 @@ TEST_F(CollectionTest, VerifyIndexIsUpdated) {
WriteUnitOfWork wuow(opCtx);
Snapshotted<BSONObj> oldSnap(opCtx->recoveryUnit()->getSnapshotId(), oldDoc);
CollectionUpdateArgs args;
- coll->updateDocument(opCtx, oldRecordId, oldSnap, newDoc, true, nullptr, &args);
+ collection_internal::updateDocument(
+ opCtx, coll, oldRecordId, oldSnap, newDoc, true, nullptr, &args);
wuow.commit();
}
auto indexRecordId = userIdx->getEntry()->accessMethod()->asSortedData()->findSingle(
@@ -321,14 +322,16 @@ TEST_F(CollectionTest, VerifyIndexIsUpdatedWithDamages) {
WriteUnitOfWork wuow(opCtx);
Snapshotted<BSONObj> oldSnap(opCtx->recoveryUnit()->getSnapshotId(), oldDoc);
CollectionUpdateArgs args;
- auto newDocStatus = coll->updateDocumentWithDamages(opCtx,
- oldRecordId,
- oldSnap,
- damagesOutput.damageSource.get(),
- damagesOutput.damages,
- true,
- nullptr,
- &args);
+ auto newDocStatus =
+ collection_internal::updateDocumentWithDamages(opCtx,
+ coll,
+ oldRecordId,
+ oldSnap,
+ damagesOutput.damageSource.get(),
+ damagesOutput.damages,
+ true,
+ nullptr,
+ &args);
ASSERT_OK(newDocStatus);
ASSERT_BSONOBJ_EQ(newDoc, newDocStatus.getValue());
wuow.commit();
diff --git a/src/mongo/db/catalog/collection_write_path.cpp b/src/mongo/db/catalog/collection_write_path.cpp
index 69664b71635..38db9f4f22e 100644
--- a/src/mongo/db/catalog/collection_write_path.cpp
+++ b/src/mongo/db/catalog/collection_write_path.cpp
@@ -29,6 +29,7 @@
#include "mongo/db/catalog/collection_write_path.h"
+#include "mongo/bson/simple_bsonelement_comparator.h"
#include "mongo/crypto/fle_crypto.h"
#include "mongo/db/catalog/capped_collection_maintenance.h"
#include "mongo/db/catalog/document_validation.h"
@@ -68,6 +69,34 @@ MONGO_FAIL_POINT_DEFINE(hangAfterCollectionInserts);
// This fail point introduces corruption to documents during insert.
MONGO_FAIL_POINT_DEFINE(corruptDocumentOnInsert);
+bool compareSafeContentElem(const BSONObj& oldDoc, const BSONObj& newDoc) {
+ if (newDoc.hasField(kSafeContent) != oldDoc.hasField(kSafeContent)) {
+ return false;
+ }
+ if (!newDoc.hasField(kSafeContent)) {
+ return true;
+ }
+
+ return newDoc.getField(kSafeContent).binaryEqual(oldDoc.getField(kSafeContent));
+}
+
+std::vector<OplogSlot> reserveOplogSlotsForRetryableFindAndModify(OperationContext* opCtx,
+ const CollectionPtr& collection) {
+ // For retryable findAndModify running in a multi-document transaction, we will reserve the
+ // oplog entries when the transaction prepares or commits without prepare.
+ if (opCtx->inMultiDocumentTransaction()) {
+ return {};
+ }
+
+ // We reserve oplog slots here, expecting the slot with the greatest timestmap (say TS) to be
+ // used as the oplog timestamp. Tenant migrations and resharding will forge no-op image oplog
+ // entries and set the timestamp for these synthetic entries to be TS - 1.
+ auto oplogInfo = LocalOplogInfo::get(opCtx);
+ auto slots = oplogInfo->getNextOpTimes(opCtx, 2);
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(slots.back().getTimestamp()));
+ return slots;
+}
+
Status insertDocumentsImpl(OperationContext* opCtx,
const CollectionPtr& collection,
const std::vector<InsertStatement>::const_iterator begin,
@@ -365,5 +394,197 @@ Status checkFailCollectionInsertsFailPoint(const NamespaceString& ns, const BSON
return s;
}
+RecordId updateDocument(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ const RecordId& oldLocation,
+ const Snapshotted<BSONObj>& oldDoc,
+ const BSONObj& newDoc,
+ bool indexesAffected,
+ OpDebug* opDebug,
+ CollectionUpdateArgs* args) {
+ {
+ auto status = collection->checkValidationAndParseResult(opCtx, newDoc);
+ if (!status.isOK()) {
+ if (validationLevelOrDefault(collection->getCollectionOptions().validationLevel) ==
+ ValidationLevelEnum::strict) {
+ uassertStatusOK(status);
+ }
+ // moderate means we have to check the old doc
+ auto oldDocStatus = collection->checkValidationAndParseResult(opCtx, oldDoc.value());
+ if (oldDocStatus.isOK()) {
+ // transitioning from good -> bad is not ok
+ uassertStatusOK(status);
+ }
+ // bad -> bad is ok in moderate mode
+ }
+ }
+
+ auto& validationSettings = DocumentValidationSettings::get(opCtx);
+ if (collection->getCollectionOptions().encryptedFieldConfig &&
+ !validationSettings.isSchemaValidationDisabled() &&
+ !validationSettings.isSafeContentValidationDisabled()) {
+
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "New document and old document both need to have " << kSafeContent
+ << " field.",
+ compareSafeContentElem(oldDoc.value(), newDoc));
+ }
+
+ dassert(opCtx->lockState()->isCollectionLockedForMode(collection->ns(), MODE_IX));
+ invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId());
+ invariant(newDoc.isOwned());
+
+ if (collection->needsCappedLock()) {
+ Lock::ResourceLock heldUntilEndOfWUOW{
+ opCtx, ResourceId(RESOURCE_METADATA, collection->ns()), MODE_X};
+ }
+
+ SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId();
+
+ BSONElement oldId = oldDoc.value()["_id"];
+ if (!oldId.eoo() && SimpleBSONElementComparator::kInstance.evaluate(oldId != newDoc["_id"]))
+ uasserted(13596, "in Collection::updateDocument _id mismatch");
+
+ // The preImageDoc may not be boost::none if this update was a retryable findAndModify or if
+ // the update may have changed the shard key. For non-in-place updates we always set the
+ // preImageDoc here to an owned copy of the pre-image.
+ if (!args->preImageDoc) {
+ args->preImageDoc = oldDoc.value().getOwned();
+ }
+ args->changeStreamPreAndPostImagesEnabledForCollection =
+ collection->isChangeStreamPreAndPostImagesEnabled();
+
+ OplogUpdateEntryArgs onUpdateArgs(args, collection);
+ const bool setNeedsRetryImageOplogField =
+ args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None;
+ if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && args->retryableWrite) {
+ onUpdateArgs.retryableFindAndModifyLocation =
+ RetryableFindAndModifyLocation::kSideCollection;
+ // If the update is part of a retryable write and we expect to be storing the pre- or
+ // post-image in a side collection, then we must reserve oplog slots in advance. We
+ // expect to use the reserved oplog slots as follows, where TS is the greatest
+ // timestamp of 'oplogSlots':
+ // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set
+ // the entry timestamps to TS - 1.
+ // TS: The timestamp given to the update oplog entry.
+ args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, collection);
+ } else {
+ // Retryable findAndModify commands should not reserve oplog slots before entering this
+ // function since tenant migrations and resharding rely on always being able to set
+ // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1.
+ invariant(!(args->retryableWrite && setNeedsRetryImageOplogField));
+ }
+
+ uassertStatusOK(collection->getRecordStore()->updateRecord(
+ opCtx, oldLocation, newDoc.objdata(), newDoc.objsize()));
+
+ if (indexesAffected) {
+ int64_t keysInserted = 0;
+ int64_t keysDeleted = 0;
+
+ uassertStatusOK(collection->getIndexCatalog()->updateRecord(opCtx,
+ collection,
+ *args->preImageDoc,
+ newDoc,
+ oldLocation,
+ &keysInserted,
+ &keysDeleted));
+
+ if (opDebug) {
+ opDebug->additiveMetrics.incrementKeysInserted(keysInserted);
+ opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted);
+ // 'opDebug' may be deleted at rollback time in case of multi-document transaction.
+ if (!opCtx->inMultiDocumentTransaction()) {
+ opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() {
+ opDebug->additiveMetrics.incrementKeysInserted(-keysInserted);
+ opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted);
+ });
+ }
+ }
+ }
+
+ invariant(sid == opCtx->recoveryUnit()->getSnapshotId());
+ args->updatedDoc = newDoc;
+
+ opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs);
+
+ return oldLocation;
+}
+
+StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ const RecordId& loc,
+ const Snapshotted<BSONObj>& oldDoc,
+ const char* damageSource,
+ const mutablebson::DamageVector& damages,
+ bool indexesAffected,
+ OpDebug* opDebug,
+ CollectionUpdateArgs* args) {
+ dassert(opCtx->lockState()->isCollectionLockedForMode(collection->ns(), MODE_IX));
+ invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId());
+ invariant(collection->updateWithDamagesSupported());
+
+ // For in-place updates we need to grab an owned copy of the pre-image doc if pre-image
+ // recording is enabled and we haven't already set the pre-image due to this update being
+ // a retryable findAndModify or a possible update to the shard key.
+ if (!args->preImageDoc && collection->isChangeStreamPreAndPostImagesEnabled()) {
+ args->preImageDoc = oldDoc.value().getOwned();
+ }
+ OplogUpdateEntryArgs onUpdateArgs(args, collection);
+ const bool setNeedsRetryImageOplogField =
+ args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None;
+ if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && args->retryableWrite) {
+ onUpdateArgs.retryableFindAndModifyLocation =
+ RetryableFindAndModifyLocation::kSideCollection;
+ // If the update is part of a retryable write and we expect to be storing the pre- or
+ // post-image in a side collection, then we must reserve oplog slots in advance. We
+ // expect to use the reserved oplog slots as follows, where TS is the greatest
+ // timestamp of 'oplogSlots':
+ // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set
+ // the entry timestamps to TS - 1.
+ // TS: The timestamp given to the update oplog entry.
+ args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, collection);
+ } else {
+ // Retryable findAndModify commands should not reserve oplog slots before entering this
+ // function since tenant migrations and resharding rely on always being able to set
+ // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1.
+ invariant(!(args->retryableWrite && setNeedsRetryImageOplogField));
+ }
+
+ RecordData oldRecordData(oldDoc.value().objdata(), oldDoc.value().objsize());
+ StatusWith<RecordData> recordData = collection->getRecordStore()->updateWithDamages(
+ opCtx, loc, oldRecordData, damageSource, damages);
+ if (!recordData.isOK())
+ return recordData.getStatus();
+ BSONObj newDoc = std::move(recordData.getValue()).releaseToBson().getOwned();
+
+ args->updatedDoc = newDoc;
+ args->changeStreamPreAndPostImagesEnabledForCollection =
+ collection->isChangeStreamPreAndPostImagesEnabled();
+
+ if (indexesAffected) {
+ int64_t keysInserted = 0;
+ int64_t keysDeleted = 0;
+
+ uassertStatusOK(collection->getIndexCatalog()->updateRecord(
+ opCtx, collection, oldDoc.value(), args->updatedDoc, loc, &keysInserted, &keysDeleted));
+
+ if (opDebug) {
+ opDebug->additiveMetrics.incrementKeysInserted(keysInserted);
+ opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted);
+ // 'opDebug' may be deleted at rollback time in case of multi-document transaction.
+ if (!opCtx->inMultiDocumentTransaction()) {
+ opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() {
+ opDebug->additiveMetrics.incrementKeysInserted(-keysInserted);
+ opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted);
+ });
+ }
+ }
+ }
+
+ opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs);
+ return newDoc;
+}
+
} // namespace collection_internal
} // namespace mongo
diff --git a/src/mongo/db/catalog/collection_write_path.h b/src/mongo/db/catalog/collection_write_path.h
index 084c04893ab..9e0af1f0273 100644
--- a/src/mongo/db/catalog/collection_write_path.h
+++ b/src/mongo/db/catalog/collection_write_path.h
@@ -85,5 +85,39 @@ Status insertDocument(OperationContext* opCtx,
*/
Status checkFailCollectionInsertsFailPoint(const NamespaceString& ns, const BSONObj& firstDoc);
+/**
+ * Updates the document @ oldLocation with newDoc.
+ *
+ * If the document fits in the old space, it is put there; if not, it is moved.
+ * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on
+ * success.
+ * 'opDebug' Optional argument. When not null, will be used to record operation statistics.
+ * @return the post update location of the doc (may or may not be the same as oldLocation)
+ */
+RecordId updateDocument(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ const RecordId& oldLocation,
+ const Snapshotted<BSONObj>& oldDoc,
+ const BSONObj& newDoc,
+ bool indexesAffected,
+ OpDebug* opDebug,
+ CollectionUpdateArgs* args);
+
+/**
+ * Illegal to call if collection->updateWithDamagesSupported() returns false.
+ * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on
+ * success.
+ * Returns the contents of the updated document.
+ */
+StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx,
+ const CollectionPtr& collection,
+ const RecordId& loc,
+ const Snapshotted<BSONObj>& oldDoc,
+ const char* damageSource,
+ const mutablebson::DamageVector& damages,
+ bool indexesAffected,
+ OpDebug* opDebug,
+ CollectionUpdateArgs* args);
+
} // namespace collection_internal
} // namespace mongo
diff --git a/src/mongo/db/catalog/views_for_database.cpp b/src/mongo/db/catalog/views_for_database.cpp
index 8ceda406315..23ea8df9c23 100644
--- a/src/mongo/db/catalog/views_for_database.cpp
+++ b/src/mongo/db/catalog/views_for_database.cpp
@@ -327,13 +327,14 @@ Status ViewsForDatabase::_upsertIntoCatalog(OperationContext* opCtx,
args.update = viewObj;
args.criteria = BSON("_id" << NamespaceStringUtil::serialize(view.name()));
- systemViews->updateDocument(opCtx,
- id,
- oldView,
- viewObj,
- true /* indexesAffected */,
- &CurOp::get(opCtx)->debug(),
- &args);
+ collection_internal::updateDocument(opCtx,
+ systemViews,
+ id,
+ oldView,
+ viewObj,
+ true /* indexesAffected */,
+ &CurOp::get(opCtx)->debug(),
+ &args);
}
return Status::OK();
diff --git a/src/mongo/db/catalog/virtual_collection_impl.h b/src/mongo/db/catalog/virtual_collection_impl.h
index d9596eb8ca2..a490f4f9ee1 100644
--- a/src/mongo/db/catalog/virtual_collection_impl.h
+++ b/src/mongo/db/catalog/virtual_collection_impl.h
@@ -177,34 +177,11 @@ public:
unimplementedTasserted();
}
- RecordId updateDocument(OperationContext* opCtx,
- const RecordId& oldLocation,
- const Snapshotted<BSONObj>& oldDoc,
- const BSONObj& newDoc,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const final {
- unimplementedTasserted();
- return RecordId();
- }
-
bool updateWithDamagesSupported() const final {
unimplementedTasserted();
return false;
}
- StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx,
- const RecordId& loc,
- const Snapshotted<BSONObj>& oldDoc,
- const char* damageSource,
- const mutablebson::DamageVector& damages,
- bool indexesAffected,
- OpDebug* opDebug,
- CollectionUpdateArgs* args) const final {
- unimplementedTasserted();
- return Status(ErrorCodes::UnknownError, "unknown");
- }
-
Status truncate(OperationContext* opCtx) final {
unimplementedTasserted();
return Status(ErrorCodes::UnknownError, "unknown");
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index dd00b6366e9..154da0a09c7 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/catalog/clustered_collection_util.h"
+#include "mongo/db/catalog/collection_write_path.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/btree_access_method.h"
@@ -386,7 +387,8 @@ bool Helpers::findByIdAndNoopUpdate(OperationContext* opCtx,
CollectionUpdateArgs args;
args.criteria = idQuery;
args.update = BSONObj();
- collection->updateDocument(opCtx, recordId, snapshottedDoc, result, false, nullptr, &args);
+ collection_internal::updateDocument(
+ opCtx, collection, recordId, snapshottedDoc, result, false, nullptr, &args);
return true;
}
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index 63bf1087791..ca7d4fb5ef5 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -33,6 +33,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/mutable/algorithm.h"
+#include "mongo/db/catalog/collection_write_path.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/query/collection_query_info.h"
@@ -260,6 +261,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj,
// Ensure we set the type correctly
args.source = writeToOrphan ? OperationSource::kFromMigrate : request->source();
+ args.retryableWrite = write_stage_common::isRetryableWrite(opCtx());
+
if (inPlace) {
if (!request->explain()) {
newObj = oldObj.value();
@@ -274,14 +277,15 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj,
WriteUnitOfWork wunit(opCtx());
newObj = uassertStatusOK(
- collection()->updateDocumentWithDamages(opCtx(),
- recordId,
- oldObj,
- source,
- _damages,
- driver->modsAffectIndices(),
- _params.opDebug,
- &args));
+ collection_internal::updateDocumentWithDamages(opCtx(),
+ collection(),
+ recordId,
+ oldObj,
+ source,
+ _damages,
+ driver->modsAffectIndices(),
+ _params.opDebug,
+ &args));
invariant(oldObj.snapshotId() == opCtx()->recoveryUnit()->getSnapshotId());
wunit.commit();
}
@@ -305,13 +309,14 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj,
}
WriteUnitOfWork wunit(opCtx());
- newRecordId = collection()->updateDocument(opCtx(),
- recordId,
- oldObj,
- newObj,
- driver->modsAffectIndices(),
- _params.opDebug,
- &args);
+ newRecordId = collection_internal::updateDocument(opCtx(),
+ collection(),
+ recordId,
+ oldObj,
+ newObj,
+ driver->modsAffectIndices(),
+ _params.opDebug,
+ &args);
invariant(oldObj.snapshotId() == opCtx()->recoveryUnit()->getSnapshotId());
wunit.commit();
}
diff --git a/src/mongo/db/exec/write_stage_common.cpp b/src/mongo/db/exec/write_stage_common.cpp
index 1fa04f729b2..46f7652280b 100644
--- a/src/mongo/db/exec/write_stage_common.cpp
+++ b/src/mongo/db/exec/write_stage_common.cpp
@@ -29,6 +29,7 @@
#include "mongo/db/exec/write_stage_common.h"
+#include "mongo/base/shim.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/exec/shard_filterer_impl.h"
#include "mongo/db/exec/working_set.h"
@@ -38,6 +39,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/logv2/redaction.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kWrite
@@ -133,5 +135,10 @@ bool ensureStillMatches(const CollectionPtr& collection,
return true;
}
+bool isRetryableWrite(OperationContext* opCtx) {
+ static auto w = MONGO_WEAK_FUNCTION_DEFINITION(write_stage_common::isRetryableWrite);
+ return w(opCtx);
+}
+
} // namespace write_stage_common
} // namespace mongo
diff --git a/src/mongo/db/exec/write_stage_common.h b/src/mongo/db/exec/write_stage_common.h
index 870178f67c7..2f4a49ac8a9 100644
--- a/src/mongo/db/exec/write_stage_common.h
+++ b/src/mongo/db/exec/write_stage_common.h
@@ -96,5 +96,11 @@ bool ensureStillMatches(const CollectionPtr& collection,
WorkingSet* ws,
WorkingSetID id,
const CanonicalQuery* cq);
+
+/**
+ * Returns true if we are running retryable write or retryable internal multi-document transaction.
+ */
+bool isRetryableWrite(OperationContext* opCtx);
+
} // namespace write_stage_common
} // namespace mongo
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.cpp b/src/mongo/db/free_mon/free_mon_op_observer.cpp
index 75db3aeb817..0729e33b84a 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.cpp
+++ b/src/mongo/db/free_mon/free_mon_op_observer.cpp
@@ -100,7 +100,7 @@ void FreeMonOpObserver::onInserts(OperationContext* opCtx,
}
void FreeMonOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
- if (args.nss != NamespaceString::kServerConfigurationNamespace) {
+ if (args.coll->ns() != NamespaceString::kServerConfigurationNamespace) {
return;
}
diff --git a/src/mongo/db/op_observer/fcv_op_observer.cpp b/src/mongo/db/op_observer/fcv_op_observer.cpp
index 5acca8bc3ae..763b8dcdd92 100644
--- a/src/mongo/db/op_observer/fcv_op_observer.cpp
+++ b/src/mongo/db/op_observer/fcv_op_observer.cpp
@@ -163,7 +163,7 @@ void FcvOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs
if (args.updateArgs->update.isEmpty()) {
return;
}
- if (args.nss.isServerConfigurationCollection()) {
+ if (args.coll->ns().isServerConfigurationCollection()) {
_onInsertOrUpdate(opCtx, args.updateArgs->updatedDoc);
}
}
diff --git a/src/mongo/db/op_observer/op_observer.h b/src/mongo/db/op_observer/op_observer.h
index fa770ac62a9..8f57e825ea5 100644
--- a/src/mongo/db/op_observer/op_observer.h
+++ b/src/mongo/db/op_observer/op_observer.h
@@ -62,15 +62,14 @@ enum class RetryableFindAndModifyLocation {
struct OplogUpdateEntryArgs {
CollectionUpdateArgs* updateArgs;
- NamespaceString nss;
- UUID uuid;
+ const CollectionPtr& coll;
// Specifies the pre-image recording option for retryable "findAndModify" commands.
RetryableFindAndModifyLocation retryableFindAndModifyLocation =
RetryableFindAndModifyLocation::kNone;
- OplogUpdateEntryArgs(CollectionUpdateArgs* updateArgs, NamespaceString nss, UUID uuid)
- : updateArgs(updateArgs), nss(std::move(nss)), uuid(std::move(uuid)) {}
+ OplogUpdateEntryArgs(CollectionUpdateArgs* updateArgs, const CollectionPtr& coll)
+ : updateArgs(updateArgs), coll(coll) {}
};
struct OplogDeleteEntryArgs {
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp
index f80ab67c3c5..5de4209759d 100644
--- a/src/mongo/db/op_observer/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl.cpp
@@ -196,9 +196,9 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args,
MutableOplogEntry* oplogEntry,
OplogWriter* oplogWriter) {
- oplogEntry->setTid(args.nss.tenantId());
- oplogEntry->setNss(args.nss);
- oplogEntry->setUuid(args.uuid);
+ oplogEntry->setTid(args.coll->ns().tenantId());
+ oplogEntry->setNss(args.coll->ns());
+ oplogEntry->setUuid(args.coll->uuid());
repl::OplogLink oplogLink;
oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds);
@@ -779,14 +779,15 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
failCollectionUpdates.executeIf(
[&](const BSONObj&) {
uasserted(40654,
- str::stream() << "failCollectionUpdates failpoint enabled, namespace: "
- << args.nss.ns() << ", update: " << args.updateArgs->update
- << " on document with " << args.updateArgs->criteria);
+ str::stream()
+ << "failCollectionUpdates failpoint enabled, namespace: "
+ << args.coll->ns().ns() << ", update: " << args.updateArgs->update
+ << " on document with " << args.updateArgs->criteria);
},
[&](const BSONObj& data) {
// If the failpoint specifies no collection or matches the existing one, fail.
auto collElem = data["collectionNS"];
- return !collElem || args.nss.ns() == collElem.String();
+ return !collElem || args.coll->ns().ns() == collElem.String();
});
// Do not log a no-op operation; see SERVER-21738
@@ -798,7 +799,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
const bool inMultiDocumentTransaction =
txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen();
- ShardingWriteRouter shardingWriteRouter(opCtx, args.nss, Grid::get(opCtx)->catalogCache());
+ ShardingWriteRouter shardingWriteRouter(
+ opCtx, args.coll->ns(), Grid::get(opCtx)->catalogCache());
OpTimeBundle opTime;
auto& batchedWriteContext = BatchedWriteContext::get(opCtx);
@@ -806,7 +808,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
if (inBatchedWrite) {
auto operation = MutableOplogEntry::makeUpdateOperation(
- args.nss, args.uuid, args.updateArgs->update, args.updateArgs->criteria);
+ args.coll->ns(), args.coll->uuid(), args.updateArgs->update, args.updateArgs->criteria);
operation.setDestinedRecipient(
shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
operation.setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate);
@@ -816,7 +818,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
auto operation = MutableOplogEntry::makeUpdateOperation(
- args.nss, args.uuid, args.updateArgs->update, args.updateArgs->criteria);
+ args.coll->ns(), args.coll->uuid(), args.updateArgs->update, args.updateArgs->criteria);
if (inRetryableInternalTransaction) {
operation.setInitializedStatementIds(args.updateArgs->stmtIds);
@@ -900,15 +902,15 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
if (args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection &&
!opTime.writeOpTime.isNull() &&
args.updateArgs->source != OperationSource::kFromMigrate &&
- !args.nss.isTemporaryReshardingCollection()) {
+ !args.coll->ns().isTemporaryReshardingCollection()) {
const auto& preImageDoc = args.updateArgs->preImageDoc;
tassert(5868600, "PreImage must be set", preImageDoc && !preImageDoc.value().isEmpty());
- ChangeStreamPreImageId id(args.uuid, opTime.writeOpTime.getTimestamp(), 0);
+ ChangeStreamPreImageId id(args.coll->uuid(), opTime.writeOpTime.getTimestamp(), 0);
ChangeStreamPreImage preImage(id, opTime.wallClockTime, preImageDoc.value());
ChangeStreamPreImagesCollectionManager::insertPreImage(
- opCtx, args.nss.tenantId(), preImage);
+ opCtx, args.coll->ns().tenantId(), preImage);
}
SessionTxnRecord sessionTxnRecord;
@@ -917,10 +919,10 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
onWriteOpCompleted(opCtx, args.updateArgs->stmtIds, sessionTxnRecord);
}
- if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) {
+ if (args.coll->ns() != NamespaceString::kSessionTransactionsTableNamespace) {
if (args.updateArgs->source != OperationSource::kFromMigrate) {
shardObserveUpdateOp(opCtx,
- args.nss,
+ args.coll->ns(),
args.updateArgs->preImageDoc,
args.updateArgs->updatedDoc,
opTime.writeOpTime,
@@ -930,19 +932,19 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
}
- if (args.nss.coll() == "system.js") {
+ if (args.coll->ns().coll() == "system.js") {
Scope::storedFuncMod(opCtx);
- } else if (args.nss.isSystemDotViews()) {
- CollectionCatalog::get(opCtx)->reloadViews(opCtx, args.nss.dbName()).ignore();
- } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
+ } else if (args.coll->ns().isSystemDotViews()) {
+ CollectionCatalog::get(opCtx)->reloadViews(opCtx, args.coll->ns().dbName()).ignore();
+ } else if (args.coll->ns() == NamespaceString::kSessionTransactionsTableNamespace &&
!opTime.writeOpTime.isNull()) {
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx);
mongoDSessionCatalog->observeDirectWriteToConfigTransactions(opCtx,
args.updateArgs->updatedDoc);
- } else if (args.nss == NamespaceString::kConfigSettingsNamespace) {
+ } else if (args.coll->ns() == NamespaceString::kConfigSettingsNamespace) {
ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
opCtx, args.updateArgs->updatedDoc["_id"], args.updateArgs->updatedDoc);
- } else if (args.nss.isTimeseriesBucketsCollection()) {
+ } else if (args.coll->ns().isTimeseriesBucketsCollection()) {
if (args.updateArgs->source != OperationSource::kTimeseriesInsert) {
auto& bucketCatalog = BucketCatalog::get(opCtx);
bucketCatalog.clear(args.updateArgs->updatedDoc["_id"].OID());
diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp
index 09c580d01d6..b451afe1c9c 100644
--- a/src/mongo/db/op_observer/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp
@@ -160,7 +160,7 @@ class OpObserverTest : public ServiceContextMongoDTest {
protected:
explicit OpObserverTest(Options options = {}) : ServiceContextMongoDTest(std::move(options)) {}
- void setUp() override {
+ virtual void setUp() override {
// Set up mongod.
ServiceContextMongoDTest::setUp();
@@ -223,6 +223,7 @@ protected:
}
void resetOplogAndTransactions(OperationContext* opCtx) const {
+ reset(opCtx, nss);
reset(opCtx, NamespaceString::kRsOplogNamespace);
reset(opCtx, NamespaceString::kSessionTransactionsTableNamespace);
reset(opCtx, NamespaceString::kConfigImagesNamespace);
@@ -915,11 +916,12 @@ TEST_F(OpObserverTest, SingleStatementUpdateTestIncludesTenantId) {
updateArgs.update = BSON("$set" << BSON("data"
<< "x"));
updateArgs.criteria = BSON("_id" << 0);
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
auto opCtx = cc().makeOperationContext();
WriteUnitOfWork wuow(opCtx.get());
AutoGetDb autoDb(opCtx.get(), nss.dbName(), MODE_X);
+ AutoGetCollection autoColl(opCtx.get(), nss, MODE_X);
+ OplogUpdateEntryArgs update(&updateArgs, *autoColl);
OpObserverRegistry opObserver;
opObserver.addObserver(std::make_unique<OpObserverImpl>(std::make_unique<OplogWriterImpl>()));
@@ -1089,8 +1091,6 @@ DEATH_TEST_REGEX_F(OpObserverTest,
class OpObserverTxnParticipantTest : public OpObserverTest {
public:
void setUp() override {
- OpObserverTest::setUp();
-
_opCtx = cc().makeOperationContext();
_opObserver.emplace(std::make_unique<OplogWriterImpl>());
_times.emplace(opCtx());
@@ -1179,6 +1179,7 @@ private:
class OpObserverTransactionTest : public OpObserverTxnParticipantTest {
protected:
void setUp() override {
+ OpObserverTest::setUp();
OpObserverTxnParticipantTest::setUp();
OpObserverTxnParticipantTest::setUpNonRetryableTransaction();
}
@@ -1276,7 +1277,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
updateArgs2.update = BSON("$set" << BSON("data"
<< "y"));
updateArgs2.criteria = BSON("_id" << 0);
- OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+ OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2);
opObserver().onUpdate(opCtx(), update2);
opObserver().aboutToDelete(opCtx(),
@@ -1791,6 +1792,10 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "update");
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+
CollectionUpdateArgs updateArgs1;
updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
@@ -1798,7 +1803,7 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
updateArgs1.update = BSON("$set" << BSON("data"
<< "x"));
updateArgs1.criteria = BSON("_id" << 0);
- OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+ OplogUpdateEntryArgs update1(&updateArgs1, *autoColl1);
CollectionUpdateArgs updateArgs2;
updateArgs2.stmtIds = {1};
@@ -1807,11 +1812,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
updateArgs2.update = BSON("$set" << BSON("data"
<< "y"));
updateArgs2.criteria = BSON("_id" << 1);
- OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+ OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2);
- WriteUnitOfWork wuow(opCtx());
- AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
- AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
@@ -1845,6 +1847,10 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "update");
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+
CollectionUpdateArgs updateArgs1;
updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
@@ -1852,7 +1858,7 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) {
updateArgs1.update = BSON("$set" << BSON("data"
<< "x"));
updateArgs1.criteria = BSON("_id" << 0);
- OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+ OplogUpdateEntryArgs update1(&updateArgs1, *autoColl1);
CollectionUpdateArgs updateArgs2;
updateArgs2.stmtIds = {1};
@@ -1861,11 +1867,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) {
updateArgs2.update = BSON("$set" << BSON("data"
<< "y"));
updateArgs2.criteria = BSON("_id" << 1);
- OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+ OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2);
- WriteUnitOfWork wuow(opCtx());
- AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
- AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
@@ -2029,11 +2032,14 @@ public:
OpObserverTxnParticipantTest::tearDown();
}
+ void setUp() override {
+ OpObserverTest::setUp();
+ auto opCtx = cc().makeOperationContext();
+ reset(opCtx.get(), nss, uuid);
+ }
+
protected:
void testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage() {
- NamespaceString nss = {boost::none, "test", "coll"};
- const auto uuid = UUID::gen();
-
CollectionUpdateArgs updateArgs;
updateArgs.stmtIds = {0};
updateArgs.updatedDoc = BSON("_id" << 0 << "data"
@@ -2042,10 +2048,11 @@ protected:
<< "x"));
updateArgs.criteria = BSON("_id" << 0);
updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage;
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+
+ AutoGetCollection autoColl(opCtx(), nss, MODE_IX);
+ OplogUpdateEntryArgs update(&updateArgs, *autoColl);
update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
- AutoGetDb autoDb(opCtx(), nss.dbName(), MODE_X);
opObserver().onUpdate(opCtx(), update);
commit();
@@ -2062,9 +2069,6 @@ protected:
}
void testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage() {
- NamespaceString nss = {boost::none, "test", "coll"};
- const auto uuid = UUID::gen();
-
CollectionUpdateArgs updateArgs;
updateArgs.stmtIds = {0};
updateArgs.preImageDoc = BSON("_id" << 0 << "data"
@@ -2073,10 +2077,11 @@ protected:
<< "x"));
updateArgs.criteria = BSON("_id" << 0);
updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage;
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+
+ AutoGetCollection autoColl(opCtx(), nss, MODE_IX);
+ OplogUpdateEntryArgs update(&updateArgs, *autoColl);
update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
- AutoGetDb autoDb(opCtx(), nss.dbName(), MODE_X);
opObserver().onUpdate(opCtx(), update);
commit();
@@ -2093,8 +2098,7 @@ protected:
}
void testRetryableFindAndModifyDeleteHasNeedsRetryImage() {
- NamespaceString nss = {boost::none, "test", "coll"};
- const auto uuid = UUID::gen();
+
AutoGetDb autoDb(opCtx(), nss.dbName(), MODE_X);
const auto deletedDoc = BSON("_id" << 0 << "data"
@@ -2118,6 +2122,9 @@ protected:
finish();
}
+ NamespaceString nss = {boost::none, "test", "coll"};
+ UUID uuid = UUID::gen();
+
virtual void commit() = 0;
virtual void finish() {}
@@ -2129,6 +2136,7 @@ class OpObserverRetryableFindAndModifyOutsideTransactionTest
: public OpObserverRetryableFindAndModifyTest {
public:
void setUp() override {
+ OpObserverRetryableFindAndModifyTest::setUp();
OpObserverTxnParticipantTest::setUp();
OpObserverTxnParticipantTest::setUpRetryableWrite();
}
@@ -2163,6 +2171,7 @@ class OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransacti
: public OpObserverRetryableFindAndModifyTest {
public:
void setUp() override {
+ OpObserverRetryableFindAndModifyTest::setUp();
OpObserverTxnParticipantTest::setUp();
OpObserverTxnParticipantTest::setUpRetryableInternalTransaction();
}
@@ -2199,6 +2208,7 @@ class OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransaction
: public OpObserverRetryableFindAndModifyTest {
public:
void setUp() override {
+ OpObserverRetryableFindAndModifyTest::setUp();
OpObserverTxnParticipantTest::setUp();
OpObserverTxnParticipantTest::setUpRetryableInternalTransaction();
}
@@ -2339,6 +2349,7 @@ protected:
WriteUnitOfWork wuow(opCtx);
auto reservedSlots = repl::getNextOpTimes(opCtx, 3);
update->updateArgs->oplogSlots = reservedSlots;
+ wuow.commit();
}
break;
}
@@ -2410,6 +2421,12 @@ protected:
}
}
+ void setUp() override {
+ OpObserverTest::setUp();
+ auto opCtx = cc().makeOperationContext();
+ reset(opCtx.get(), _nss, _uuid);
+ }
+
std::vector<UpdateTestCase> _cases = {
// Regular updates.
{kNonFaM, kChangeStreamImagesDisabled, kNotRetryable, 1},
@@ -2454,12 +2471,11 @@ TEST_F(OnUpdateOutputsTest, TestNonTransactionFundamentalOnUpdateOutputs) {
}
// Phase 2: Call the code we're testing.
- CollectionUpdateArgs updateArgs;
- OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
- initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
-
WriteUnitOfWork wuow(opCtx);
AutoGetCollection locks(opCtx, _nss, MODE_IX);
+ CollectionUpdateArgs updateArgs;
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, *locks);
+ initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
opObserver.onUpdate(opCtx, updateEntryArgs);
wuow.commit();
@@ -2499,12 +2515,12 @@ TEST_F(OnUpdateOutputsTest, TestFundamentalTransactionOnUpdateOutputs) {
}
// Phase 2: Call the code we're testing.
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, MODE_IX);
CollectionUpdateArgs updateArgs;
- OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, *locks);
initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
- WriteUnitOfWork wuow(opCtx);
- AutoGetCollection locks(opCtx, _nss, MODE_IX);
opObserver.onUpdate(opCtx, updateEntryArgs);
commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver);
wuow.commit();
@@ -2827,7 +2843,7 @@ TEST_F(BatchedWriteOutputsTest, TestApplyOpsInsertDeleteUpdate) {
collUpdateArgs.update = BSON("fieldToUpdate"
<< "valueToUpdate");
collUpdateArgs.criteria = BSON("_id" << 2);
- auto args = OplogUpdateEntryArgs(&collUpdateArgs, _nss, autoColl->uuid());
+ auto args = OplogUpdateEntryArgs(&collUpdateArgs, *autoColl);
opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, args);
}
@@ -2914,7 +2930,7 @@ TEST_F(BatchedWriteOutputsTest, TestApplyOpsInsertDeleteUpdateIncludesTenantId)
collUpdateArgs.update = BSON("fieldToUpdate"
<< "valueToUpdate");
collUpdateArgs.criteria = BSON("_id" << 2);
- auto args = OplogUpdateEntryArgs(&collUpdateArgs, autoColl->ns(), autoColl->uuid());
+ auto args = OplogUpdateEntryArgs(&collUpdateArgs, *autoColl);
opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, args);
}
@@ -3539,6 +3555,10 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "update");
+ WriteUnitOfWork wuow(opCtx());
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+
CollectionUpdateArgs updateArgs1;
updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
@@ -3546,7 +3566,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
updateArgs1.update = BSON("$set" << BSON("data"
<< "x"));
updateArgs1.criteria = BSON("_id" << 0);
- OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+ OplogUpdateEntryArgs update1(&updateArgs1, *autoColl1);
CollectionUpdateArgs updateArgs2;
updateArgs2.stmtIds = {1};
@@ -3555,11 +3575,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
updateArgs2.update = BSON("$set" << BSON("data"
<< "y"));
updateArgs2.criteria = BSON("_id" << 1);
- OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+ OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2);
- WriteUnitOfWork wuow(opCtx());
- AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
- AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
@@ -3734,6 +3751,9 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "update");
+ AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
+ AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
+
CollectionUpdateArgs updateArgs1;
updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
@@ -3741,7 +3761,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
updateArgs1.update = BSON("$set" << BSON("data"
<< "x"));
updateArgs1.criteria = BSON("_id" << 0);
- OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1);
+ OplogUpdateEntryArgs update1(&updateArgs1, *autoColl1);
CollectionUpdateArgs updateArgs2;
updateArgs2.stmtIds = {1};
@@ -3750,10 +3770,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
updateArgs2.update = BSON("$set" << BSON("data"
<< "y"));
updateArgs2.criteria = BSON("_id" << 1);
- OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2);
+ OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2);
- AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX);
- AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX);
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
diff --git a/src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp b/src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp
index 93fa938f2be..d1e90077f48 100644
--- a/src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp
+++ b/src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp
@@ -89,10 +89,10 @@ void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx,
void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args) {
if (args.updateArgs->source != OperationSource::kFromMigrate) {
- _checkWriteAllowed(opCtx, args.nss);
+ _checkWriteAllowed(opCtx, args.coll->ns());
}
- if (args.nss == NamespaceString::kUserWritesCriticalSectionsNamespace &&
+ if (args.coll->ns() == NamespaceString::kUserWritesCriticalSectionsNamespace &&
!user_writes_recoverable_critical_section_util::inRecoveryMode(opCtx)) {
const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse(
IDLParserContext("UserWriteBlockOpObserver"), args.updateArgs->updatedDoc);
diff --git a/src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp b/src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp
index fff61baf687..baeadb8e394 100644
--- a/src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp
+++ b/src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp
@@ -89,8 +89,7 @@ protected:
collectionUpdateArgs.source =
fromMigrate ? OperationSource::kFromMigrate : OperationSource::kStandard;
auto uuid = UUID::gen();
- OplogUpdateEntryArgs updateArgs(&collectionUpdateArgs, nss, uuid);
- updateArgs.nss = nss;
+ OplogUpdateEntryArgs updateArgs(&collectionUpdateArgs, *autoColl);
OplogDeleteEntryArgs deleteArgs;
deleteArgs.fromMigrate = fromMigrate;
if (shouldSucceed) {
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index f22d3a915e1..f3a37117b00 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -1457,8 +1457,8 @@ Status performAtomicTimeseriesWrites(
opCtx->recoveryUnit()->setTimestamp(args.oplogSlots[0].getTimestamp()));
}
- coll->updateDocument(
- opCtx, recordId, original, updated, indexesAffected, &curOp->debug(), &args);
+ collection_internal::updateDocument(
+ opCtx, *coll, recordId, original, updated, indexesAffected, &curOp->debug(), &args);
if (slot) {
if (participant) {
// Manually sets the timestamp so that the "prevOpTime" field in the oplog entry is
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 9d87b25ba6d..dd0aac54126 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1494,6 +1494,7 @@ env.Library(
'tenant_migration_utils',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/collection_crud',
"$BUILD_DIR/mongo/db/catalog/local_oplog_info",
"$BUILD_DIR/mongo/db/concurrency/exception_util",
"$BUILD_DIR/mongo/db/index_builds_coordinator_interface",
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
index a24e438feb0..7c4a904b1a5 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
@@ -262,7 +262,7 @@ void OplogApplierImplTest::_testApplyOplogEntryOrGroupedInsertsCrudOperation(
_opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
applyOpCalled = true;
checkOpCtx(opCtx);
- ASSERT_EQUALS(targetNss, args.nss);
+ ASSERT_EQUALS(targetNss, args.coll->ns());
return Status::OK();
};
diff --git a/src/mongo/db/repl/storage_timestamp_test.cpp b/src/mongo/db/repl/storage_timestamp_test.cpp
index ea6f20de749..28c7b520aba 100644
--- a/src/mongo/db/repl/storage_timestamp_test.cpp
+++ b/src/mongo/db/repl/storage_timestamp_test.cpp
@@ -3303,14 +3303,16 @@ TEST_F(RetryableFindAndModifyTest, RetryableFindAndModifyUpdate) {
args.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage;
args.update = BSON("$set" << BSON("b" << 1));
args.criteria = BSON("_id" << 0);
+ args.retryableWrite = true;
{
auto cursor = collection->getCursor(_opCtx);
auto record = cursor->next();
invariant(record);
WriteUnitOfWork wuow(_opCtx);
- collection->updateDocument(
+ collection_internal::updateDocument(
_opCtx,
+ collection.get(),
record->id,
Snapshotted<BSONObj>(_opCtx->recoveryUnit()->getSnapshotId(), oldObj),
newObj,
@@ -3359,6 +3361,7 @@ TEST_F(RetryableFindAndModifyTest, RetryableFindAndModifyUpdateWithDamages) {
args.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage;
args.update = BSON("$set" << BSON("a" << 0));
args.criteria = BSON("_id" << 0);
+ args.retryableWrite = true;
{
Snapshotted<BSONObj> objSnapshot(_opCtx->recoveryUnit()->getSnapshotId(), oldObj);
@@ -3366,8 +3369,8 @@ TEST_F(RetryableFindAndModifyTest, RetryableFindAndModifyUpdateWithDamages) {
auto record = cursor->next();
invariant(record);
WriteUnitOfWork wuow(_opCtx);
- const auto statusWith = collection->updateDocumentWithDamages(
- _opCtx, record->id, objSnapshot, source, damages, false, nullptr, &args);
+ const auto statusWith = collection_internal::updateDocumentWithDamages(
+ _opCtx, *autoColl, record->id, objSnapshot, source, damages, false, nullptr, &args);
wuow.commit();
ASSERT_OK(statusWith.getStatus());
}
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
index 65f765cf94e..7fb4bf05a29 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp
@@ -272,7 +272,7 @@ void TenantMigrationDonorOpObserver::onInserts(OperationContext* opCtx,
void TenantMigrationDonorOpObserver::onUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args) {
- if (args.nss == NamespaceString::kTenantMigrationDonorsNamespace &&
+ if (args.coll->ns() == NamespaceString::kTenantMigrationDonorsNamespace &&
!tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
auto donorStateDoc =
tenant_migration_access_blocker::parseDonorStateDocument(args.updateArgs->updatedDoc);
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index f1949c83569..9470ac2b036 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/config.h"
+#include "mongo/db/catalog/collection_write_path.h"
#include "mongo/db/commands/tenant_migration_donor_cmds_gen.h"
#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h"
#include "mongo/db/concurrency/exception_util.h"
@@ -628,13 +629,14 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
args.oplogSlots = {oplogSlot};
args.update = updatedStateDocBson;
- collection->updateDocument(opCtx,
- originalRecordId,
- originalSnapshot,
- updatedStateDocBson,
- false,
- nullptr /* OpDebug* */,
- &args);
+ collection_internal::updateDocument(opCtx,
+ *collection,
+ originalRecordId,
+ originalSnapshot,
+ updatedStateDocBson,
+ false,
+ nullptr /* OpDebug* */,
+ &args);
wuow.commit();
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index 19f5f6ab71c..b955988ab2d 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -208,7 +208,7 @@ void TenantMigrationRecipientOpObserver::onInserts(
void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args) {
- if (args.nss == NamespaceString::kTenantMigrationRecipientsNamespace &&
+ if (args.coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace &&
!tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
auto recipientStateDoc = TenantMigrationRecipientDocument::parse(
IDLParserContext("recipientStateDoc"), args.updateArgs->updatedDoc);
diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
index f00a1f7efb2..319ef74c58f 100644
--- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp
@@ -715,8 +715,8 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) {
bool onUpdateCalled = false;
_opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
onUpdateCalled = true;
- ASSERT_EQUALS(nss, args.nss);
- ASSERT_EQUALS(uuid, args.uuid);
+ ASSERT_EQUALS(nss, args.coll->ns());
+ ASSERT_EQUALS(uuid, args.coll->uuid());
};
pushOps({entry});
auto writerPool = makeTenantMigrationWriterPool();
diff --git a/src/mongo/db/s/README.md b/src/mongo/db/s/README.md
index 028c09071a1..c5cd8c5d081 100644
--- a/src/mongo/db/s/README.md
+++ b/src/mongo/db/s/README.md
@@ -22,10 +22,10 @@ corresponding data.
The authoritative routing table is stored in a set of unsharded collections in the config database
on the config server replica set. The schemas of the relevant collections are:
-* [**config.databases**](https://docs.mongodb.com/manual/reference/config-database/#config.databases)
-* [**config.collections**](https://docs.mongodb.com/manual/reference/config-database/#config.collections)
-* [**config.chunks**](https://docs.mongodb.com/manual/reference/config-database/#config.chunks)
-* [**config.shards**](https://docs.mongodb.com/manual/reference/config-database/#config.shards)
+* [**config.databases**](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.databases)
+* [**config.collections**](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.collections)
+* [**config.chunks**](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.chunks)
+* [**config.shards**](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.shards)
#### Code references
diff --git a/src/mongo/db/s/config_server_op_observer.cpp b/src/mongo/db/s/config_server_op_observer.cpp
index 983251d7d09..b947b0de536 100644
--- a/src/mongo/db/s/config_server_op_observer.cpp
+++ b/src/mongo/db/s/config_server_op_observer.cpp
@@ -137,7 +137,7 @@ void ConfigServerOpObserver::onInserts(OperationContext* opCtx,
}
void ConfigServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
- if (args.nss != NamespaceString::kConfigsvrShardsNamespace) {
+ if (args.coll->ns() != NamespaceString::kConfigsvrShardsNamespace) {
return;
}
diff --git a/src/mongo/db/s/query_analysis_op_observer.cpp b/src/mongo/db/s/query_analysis_op_observer.cpp
index efdbb36a4ee..84606c7f4c7 100644
--- a/src/mongo/db/s/query_analysis_op_observer.cpp
+++ b/src/mongo/db/s/query_analysis_op_observer.cpp
@@ -74,13 +74,13 @@ void QueryAnalysisOpObserver::onInserts(OperationContext* opCtx,
void QueryAnalysisOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
if (analyze_shard_key::supportsCoordinatingQueryAnalysis()) {
- if (args.nss == NamespaceString::kConfigQueryAnalyzersNamespace) {
+ if (args.coll->ns() == NamespaceString::kConfigQueryAnalyzersNamespace) {
const auto& updatedDoc = args.updateArgs->updatedDoc;
opCtx->recoveryUnit()->onCommit([opCtx, updatedDoc](boost::optional<Timestamp>) {
analyze_shard_key::QueryAnalysisCoordinator::get(opCtx)->onConfigurationUpdate(
updatedDoc);
});
- } else if (args.nss == MongosType::ConfigNS) {
+ } else if (args.coll->ns() == MongosType::ConfigNS) {
const auto& updatedDoc = args.updateArgs->updatedDoc;
opCtx->recoveryUnit()->onCommit([opCtx, updatedDoc](boost::optional<Timestamp>) {
analyze_shard_key::QueryAnalysisCoordinator::get(opCtx)->onSamplerUpdate(
diff --git a/src/mongo/db/s/range_deleter_service_op_observer.cpp b/src/mongo/db/s/range_deleter_service_op_observer.cpp
index 8765dfedbbb..9c1e51f9a6f 100644
--- a/src/mongo/db/s/range_deleter_service_op_observer.cpp
+++ b/src/mongo/db/s/range_deleter_service_op_observer.cpp
@@ -85,7 +85,7 @@ void RangeDeleterServiceOpObserver::onInserts(OperationContext* opCtx,
void RangeDeleterServiceOpObserver::onUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args) {
- if (args.nss == NamespaceString::kRangeDeletionNamespace) {
+ if (args.coll->ns() == NamespaceString::kRangeDeletionNamespace) {
const bool pendingFieldIsRemoved = [&] {
return update_oplog_entry::isFieldRemovedByUpdate(
args.updateArgs->update, RangeDeletionTask::kPendingFieldName) ==
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp
index c34355c2d17..840381f7942 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp
@@ -206,7 +206,7 @@ void ReshardingOpObserver::onInserts(OperationContext* opCtx,
}
void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) {
- if (args.nss == NamespaceString::kDonorReshardingOperationsNamespace) {
+ if (args.coll->ns() == NamespaceString::kDonorReshardingOperationsNamespace) {
// Primaries and secondaries should execute pinning logic when observing changes to the
// donor resharding document.
_doPin(opCtx);
@@ -218,7 +218,7 @@ void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEn
return;
}
- if (args.nss == NamespaceString::kConfigReshardingOperationsNamespace) {
+ if (args.coll->ns() == NamespaceString::kConfigReshardingOperationsNamespace) {
auto newCoordinatorDoc = ReshardingCoordinatorDocument::parse(
IDLParserContext("reshardingCoordinatorDoc"), args.updateArgs->updatedDoc);
opCtx->recoveryUnit()->onCommit([opCtx, newCoordinatorDoc = std::move(newCoordinatorDoc)](
@@ -241,9 +241,10 @@ void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEn
"error"_attr = redact(ex.toStatus()));
}
});
- } else if (args.nss.isTemporaryReshardingCollection()) {
+ } else if (args.coll->ns().isTemporaryReshardingCollection()) {
const std::vector<InsertStatement> updateDoc{InsertStatement{args.updateArgs->updatedDoc}};
- assertCanExtractShardKeyFromDocs(opCtx, args.nss, updateDoc.begin(), updateDoc.end());
+ assertCanExtractShardKeyFromDocs(
+ opCtx, args.coll->ns(), updateDoc.begin(), updateDoc.end());
}
}
diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.h b/src/mongo/db/s/resharding/resharding_service_test_helpers.h
index 70fe8786fc7..73e06640595 100644
--- a/src/mongo/db/s/resharding/resharding_service_test_helpers.h
+++ b/src/mongo/db/s/resharding/resharding_service_test_helpers.h
@@ -165,7 +165,7 @@ public:
}
void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override {
- if (args.nss != _stateDocumentNss) {
+ if (args.coll->ns() != _stateDocumentNss) {
return;
}
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index 0df49f52f89..744c79a1eca 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -330,7 +330,8 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
const bool needsSpecialHandling = !updateDoc.isEmpty() &&
(update_oplog_entry::extractUpdateType(updateDoc) !=
update_oplog_entry::UpdateType::kReplacement);
- if (needsSpecialHandling && args.nss == NamespaceString::kShardConfigCollectionsNamespace) {
+ if (needsSpecialHandling &&
+ args.coll->ns() == NamespaceString::kShardConfigCollectionsNamespace) {
// Notification of routing table changes are only needed on secondaries
if (isStandaloneOrPrimary(opCtx)) {
return;
@@ -383,7 +384,8 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
}
}
- if (needsSpecialHandling && args.nss == NamespaceString::kShardConfigDatabasesNamespace) {
+ if (needsSpecialHandling &&
+ args.coll->ns() == NamespaceString::kShardConfigDatabasesNamespace) {
// Notification of routing table changes are only needed on secondaries
if (isStandaloneOrPrimary(opCtx)) {
return;
@@ -422,7 +424,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
}
}
- if (needsSpecialHandling && args.nss == NamespaceString::kRangeDeletionNamespace) {
+ if (needsSpecialHandling && args.coll->ns() == NamespaceString::kRangeDeletionNamespace) {
if (!isStandaloneOrPrimary(opCtx))
return;
@@ -442,7 +444,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
}
}
- if (args.nss == NamespaceString::kCollectionCriticalSectionsNamespace &&
+ if (args.coll->ns() == NamespaceString::kCollectionCriticalSectionsNamespace &&
!recoverable_critical_section_util::inRecoveryMode(opCtx)) {
const auto collCSDoc = CollectionCriticalSectionDocument::parse(
IDLParserContext("ShardServerOpObserver"), args.updateArgs->updatedDoc);
@@ -468,11 +470,11 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE
}
auto metadata = CollectionShardingRuntime::assertCollectionLockedAndAcquire(
- opCtx, args.nss, CSRAcquisitionMode::kShared)
+ opCtx, args.coll->ns(), CSRAcquisitionMode::kShared)
->getCurrentMetadataIfKnown();
if (metadata && metadata->isSharded()) {
incrementChunkOnInsertOrUpdate(opCtx,
- args.nss,
+ args.coll->ns(),
*metadata->getChunkManager(),
args.updateArgs->updatedDoc,
args.updateArgs->updatedDoc.objsize(),
diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript
index 82143897663..b0751b45f27 100644
--- a/src/mongo/db/serverless/SConscript
+++ b/src/mongo/db/serverless/SConscript
@@ -83,6 +83,7 @@ env.Library(
'shard_split_state_machine',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/collection_crud',
'$BUILD_DIR/mongo/db/catalog/local_oplog_info',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/dbhelpers',
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index fb2d4dd9064..54e33417d40 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -334,7 +334,7 @@ void ShardSplitDonorOpObserver::onInserts(OperationContext* opCtx,
void ShardSplitDonorOpObserver::onUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args) {
- if (args.nss != NamespaceString::kShardSplitDonorsNamespace ||
+ if (args.coll->ns() != NamespaceString::kShardSplitDonorsNamespace ||
tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
return;
}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
index bd1eb42c23b..b644356c301 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
@@ -118,7 +118,8 @@ protected:
BSON("$set" << BSON(ShardSplitDonorDocument::kStateFieldName
<< ShardSplitDonorState_serializer(stateDocument.getState())));
updateArgs.criteria = BSON("_id" << stateDocument.getId());
- OplogUpdateEntryArgs update(&updateArgs, _nss, stateDocument.getId());
+ AutoGetCollection autoColl(_opCtx.get(), _nss, MODE_IX);
+ OplogUpdateEntryArgs update(&updateArgs, *autoColl);
WriteUnitOfWork wuow(_opCtx.get());
_observer->onUpdate(_opCtx.get(), update);
@@ -340,7 +341,8 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbortingIndexBuildsFail) {
BSON("$set" << BSON(ShardSplitDonorDocument::kStateFieldName
<< ShardSplitDonorState_serializer(stateDocument.getState())));
updateArgs.criteria = BSON("_id" << stateDocument.getId());
- OplogUpdateEntryArgs update(&updateArgs, _nss, stateDocument.getId());
+ AutoGetCollection autoColl(_opCtx.get(), _nss, MODE_IX);
+ OplogUpdateEntryArgs update(&updateArgs, *autoColl);
auto update_lambda = [&]() {
WriteUnitOfWork wuow(_opCtx.get());
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 80697cbe0f4..bd715cb167e 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -29,7 +29,9 @@
#include "mongo/db/serverless/shard_split_donor_service.h"
+
#include "mongo/client/streamable_replica_set_monitor.h"
+#include "mongo/db/catalog/collection_write_path.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/dbdirectclient.h"
@@ -964,13 +966,14 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
args.oplogSlots = {oplogSlot};
args.update = updatedStateDocBson;
- collection->updateDocument(opCtx,
- originalRecordId,
- originalSnapshot,
- updatedStateDocBson,
- false,
- nullptr /* OpDebug* */,
- &args);
+ collection_internal::updateDocument(opCtx,
+ *collection,
+ originalRecordId,
+ originalSnapshot,
+ updatedStateDocBson,
+ false,
+ nullptr /* OpDebug* */,
+ &args);
return oplogSlot;
}();
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp
index 2f0c5534dda..58ba1425421 100644
--- a/src/mongo/db/transaction/transaction_participant.cpp
+++ b/src/mongo/db/transaction/transaction_participant.cpp
@@ -34,6 +34,7 @@
#include <fmt/format.h>
+#include "mongo/base/shim.h"
#include "mongo/db/catalog/collection_write_path.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
@@ -49,6 +50,7 @@
#include "mongo/db/curop_failpoint_helpers.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/exec/write_stage_common.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/op_observer/op_observer.h"
#include "mongo/db/ops/update.h"
@@ -451,13 +453,14 @@ void updateSessionEntry(OperationContext* opCtx,
// Specify indexesAffected = false because the sessions collection has two indexes: {_id: 1} and
// {parentLsid: 1, _id.txnNumber: 1, _id: 1}, and none of the fields are mutable.
- collection->updateDocument(opCtx,
- recordId,
- Snapshotted<BSONObj>(startingSnapshotId, originalDoc),
- updateMod,
- false, /* indexesAffected */
- nullptr,
- &args);
+ collection_internal::updateDocument(opCtx,
+ *collection,
+ recordId,
+ Snapshotted<BSONObj>(startingSnapshotId, originalDoc),
+ updateMod,
+ false, /* indexesAffected */
+ nullptr,
+ &args);
wuow.commit();
}
@@ -471,6 +474,21 @@ void updateSessionEntry(OperationContext* opCtx,
// will be allowed to commit.
MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite);
+/**
+ * Returns true if we are running retryable write or retryable internal multi-document transaction.
+ */
+bool writeStageCommonIsRetryableWriteImpl(OperationContext* opCtx) {
+ if (!opCtx->writesAreReplicated() || !opCtx->isRetryableWrite()) {
+ return false;
+ }
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ return txnParticipant &&
+ (!opCtx->inMultiDocumentTransaction() || txnParticipant.transactionIsOpen());
+}
+
+auto isRetryableWriteRegistration = MONGO_WEAK_FUNCTION_REGISTRATION(
+ write_stage_common::isRetryableWrite, writeStageCommonIsRetryableWriteImpl);
+
} // namespace
const BSONObj TransactionParticipant::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));