diff options
author | Daniel Gómez Ferro <daniel.gomezferro@mongodb.com> | 2022-10-28 11:13:54 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-28 12:43:13 +0000 |
commit | 7bfdaf25f35df2c329eae4150ee7c6599e15e5b0 (patch) | |
tree | 68e4fbdd7a1d6b5a6767ecb7ac4ecb7fd7d7b99c /src/mongo/db | |
parent | 819bdae0b239f0ad75c0791e18943e6c4cf9762d (diff) | |
download | mongo-7bfdaf25f35df2c329eae4150ee7c6599e15e5b0.tar.gz |
SERVER-70044 Thread-through CollectionPtr into the onUpdate OpObserver
Diffstat (limited to 'src/mongo/db')
42 files changed, 497 insertions, 477 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index a848fd32367..e00940fd651 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -992,6 +992,7 @@ env.Library( 'query_exec', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/collection_crud', '$BUILD_DIR/mongo/db/commands/server_status_core', '$BUILD_DIR/mongo/db/ops/write_ops', 'record_id_helpers', diff --git a/src/mongo/db/auth/auth_op_observer.cpp b/src/mongo/db/auth/auth_op_observer.cpp index b974eda9973..a2ddfa79949 100644 --- a/src/mongo/db/auth/auth_op_observer.cpp +++ b/src/mongo/db/auth/auth_op_observer.cpp @@ -67,10 +67,10 @@ void AuthOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg return; } - audit::logUpdateOperation(opCtx->getClient(), args.nss, args.updateArgs->updatedDoc); + audit::logUpdateOperation(opCtx->getClient(), args.coll->ns(), args.updateArgs->updatedDoc); AuthorizationManager::get(opCtx->getServiceContext()) - ->logOp(opCtx, "u", args.nss, args.updateArgs->update, &args.updateArgs->criteria); + ->logOp(opCtx, "u", args.coll->ns(), args.updateArgs->update, &args.updateArgs->criteria); } void AuthOpObserver::aboutToDelete(OperationContext* opCtx, diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index f3f9997b93e..3fac44cff7a 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -85,6 +85,8 @@ struct CollectionUpdateArgs { StoreDocOption storeDocOption = StoreDocOption::None; bool changeStreamPreAndPostImagesEnabledForCollection = false; + bool retryableWrite = false; + // Set if OpTimes were reserved for the update ahead of time. std::vector<OplogSlot> oplogSlots; }; @@ -347,40 +349,8 @@ public: StoreDeletedDoc storeDeletedDoc = StoreDeletedDoc::Off, CheckRecordId checkRecordId = CheckRecordId::Off) const = 0; - /** - * Updates the document @ oldLocation with newDoc. - * - * If the document fits in the old space, it is put there; if not, it is moved. - * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on - * success. - * 'opDebug' Optional argument. When not null, will be used to record operation statistics. - * @return the post update location of the doc (may or may not be the same as oldLocation) - */ - virtual RecordId updateDocument(OperationContext* opCtx, - const RecordId& oldLocation, - const Snapshotted<BSONObj>& oldDoc, - const BSONObj& newDoc, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const = 0; - virtual bool updateWithDamagesSupported() const = 0; - /** - * Illegal to call if updateWithDamagesSupported() returns false. - * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on - * success. - * - * Returns the contents of the updated document on success. - */ - virtual StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx, - const RecordId& loc, - const Snapshotted<BSONObj>& oldDoc, - const char* damageSource, - const mutablebson::DamageVector& damages, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const = 0; // ----------- diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 18928d12897..b4f3078a3c9 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -173,6 +173,7 @@ bool isRetryableWrite(OperationContext* opCtx) { (!opCtx->inMultiDocumentTransaction() || txnParticipant.transactionIsOpen()); } +// TODO(SERVER-70043) remove after that ticket is fixed std::vector<OplogSlot> reserveOplogSlotsForRetryableFindAndModify(OperationContext* opCtx) { invariant(isRetryableWrite(opCtx)); @@ -871,132 +872,6 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx, } } -bool compareSafeContentElem(const BSONObj& oldDoc, const BSONObj& newDoc) { - if (newDoc.hasField(kSafeContent) != oldDoc.hasField(kSafeContent)) { - return false; - } - if (!newDoc.hasField(kSafeContent)) { - return true; - } - - return newDoc.getField(kSafeContent).binaryEqual(oldDoc.getField(kSafeContent)); -} - -RecordId CollectionImpl::updateDocument(OperationContext* opCtx, - const RecordId& oldLocation, - const Snapshotted<BSONObj>& oldDoc, - const BSONObj& newDoc, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const { - { - auto status = checkValidationAndParseResult(opCtx, newDoc); - if (!status.isOK()) { - if (validationLevelOrDefault(_metadata->options.validationLevel) == - ValidationLevelEnum::strict) { - uassertStatusOK(status); - } - // moderate means we have to check the old doc - auto oldDocStatus = checkValidationAndParseResult(opCtx, oldDoc.value()); - if (oldDocStatus.isOK()) { - // transitioning from good -> bad is not ok - uassertStatusOK(status); - } - // bad -> bad is ok in moderate mode - } - } - - auto& validationSettings = DocumentValidationSettings::get(opCtx); - if (getCollectionOptions().encryptedFieldConfig && - !validationSettings.isSchemaValidationDisabled() && - !validationSettings.isSafeContentValidationDisabled()) { - - uassert(ErrorCodes::BadValue, - str::stream() << "New document and old document both need to have " << kSafeContent - << " field.", - compareSafeContentElem(oldDoc.value(), newDoc)); - } - - dassert(opCtx->lockState()->isCollectionLockedForMode(ns(), MODE_IX)); - invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId()); - invariant(newDoc.isOwned()); - - if (needsCappedLock()) { - Lock::ResourceLock heldUntilEndOfWUOW{opCtx, ResourceId(RESOURCE_METADATA, _ns), MODE_X}; - } - - SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId(); - - BSONElement oldId = oldDoc.value()["_id"]; - if (!oldId.eoo() && SimpleBSONElementComparator::kInstance.evaluate(oldId != newDoc["_id"])) - uasserted(13596, "in Collection::updateDocument _id mismatch"); - - // The preImageDoc may not be boost::none if this update was a retryable findAndModify or if - // the update may have changed the shard key. For non-in-place updates we always set the - // preImageDoc here to an owned copy of the pre-image. - if (!args->preImageDoc) { - args->preImageDoc = oldDoc.value().getOwned(); - } - args->changeStreamPreAndPostImagesEnabledForCollection = - isChangeStreamPreAndPostImagesEnabled(); - - OplogUpdateEntryArgs onUpdateArgs(args, ns(), _uuid); - const bool setNeedsRetryImageOplogField = - args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; - if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && isRetryableWrite(opCtx)) { - onUpdateArgs.retryableFindAndModifyLocation = - RetryableFindAndModifyLocation::kSideCollection; - // If the update is part of a retryable write and we expect to be storing the pre- or - // post-image in a side collection, then we must reserve oplog slots in advance. We - // expect to use the reserved oplog slots as follows, where TS is the greatest - // timestamp of 'oplogSlots': - // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set - // the entry timestamps to TS - 1. - // TS: The timestamp given to the update oplog entry. - args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx); - } else { - // Retryable findAndModify commands should not reserve oplog slots before entering this - // function since tenant migrations and resharding rely on always being able to set - // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1. - invariant(!(isRetryableWrite(opCtx) && setNeedsRetryImageOplogField)); - } - - uassertStatusOK(_shared->_recordStore->updateRecord( - opCtx, oldLocation, newDoc.objdata(), newDoc.objsize())); - - if (indexesAffected) { - int64_t keysInserted = 0; - int64_t keysDeleted = 0; - - uassertStatusOK(_indexCatalog->updateRecord(opCtx, - {this, CollectionPtr::NoYieldTag{}}, - *args->preImageDoc, - newDoc, - oldLocation, - &keysInserted, - &keysDeleted)); - - if (opDebug) { - opDebug->additiveMetrics.incrementKeysInserted(keysInserted); - opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted); - // 'opDebug' may be deleted at rollback time in case of multi-document transaction. - if (!opCtx->inMultiDocumentTransaction()) { - opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() { - opDebug->additiveMetrics.incrementKeysInserted(-keysInserted); - opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted); - }); - } - } - } - - invariant(sid == opCtx->recoveryUnit()->getSnapshotId()); - args->updatedDoc = newDoc; - - opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs); - - return oldLocation; -} - bool CollectionImpl::updateWithDamagesSupported() const { if (!_validator.isOK() || _validator.filter.getValue() != nullptr) return false; @@ -1004,86 +879,6 @@ bool CollectionImpl::updateWithDamagesSupported() const { return _shared->_recordStore->updateWithDamagesSupported(); } -StatusWith<BSONObj> CollectionImpl::updateDocumentWithDamages( - OperationContext* opCtx, - const RecordId& loc, - const Snapshotted<BSONObj>& oldDoc, - const char* damageSource, - const mutablebson::DamageVector& damages, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const { - dassert(opCtx->lockState()->isCollectionLockedForMode(ns(), MODE_IX)); - invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId()); - invariant(updateWithDamagesSupported()); - - // For in-place updates we need to grab an owned copy of the pre-image doc if pre-image - // recording is enabled and we haven't already set the pre-image due to this update being - // a retryable findAndModify or a possible update to the shard key. - if (!args->preImageDoc && isChangeStreamPreAndPostImagesEnabled()) { - args->preImageDoc = oldDoc.value().getOwned(); - } - OplogUpdateEntryArgs onUpdateArgs(args, ns(), _uuid); - const bool setNeedsRetryImageOplogField = - args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; - if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && isRetryableWrite(opCtx)) { - onUpdateArgs.retryableFindAndModifyLocation = - RetryableFindAndModifyLocation::kSideCollection; - // If the update is part of a retryable write and we expect to be storing the pre- or - // post-image in a side collection, then we must reserve oplog slots in advance. We - // expect to use the reserved oplog slots as follows, where TS is the greatest - // timestamp of 'oplogSlots': - // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set - // the entry timestamps to TS - 1. - // TS: The timestamp given to the update oplog entry. - args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx); - } else { - // Retryable findAndModify commands should not reserve oplog slots before entering this - // function since tenant migrations and resharding rely on always being able to set - // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1. - invariant(!(isRetryableWrite(opCtx) && setNeedsRetryImageOplogField)); - } - - RecordData oldRecordData(oldDoc.value().objdata(), oldDoc.value().objsize()); - StatusWith<RecordData> recordData = - _shared->_recordStore->updateWithDamages(opCtx, loc, oldRecordData, damageSource, damages); - if (!recordData.isOK()) - return recordData.getStatus(); - BSONObj newDoc = std::move(recordData.getValue()).releaseToBson().getOwned(); - - args->updatedDoc = newDoc; - args->changeStreamPreAndPostImagesEnabledForCollection = - isChangeStreamPreAndPostImagesEnabled(); - - if (indexesAffected) { - int64_t keysInserted = 0; - int64_t keysDeleted = 0; - - uassertStatusOK(_indexCatalog->updateRecord(opCtx, - {this, CollectionPtr::NoYieldTag{}}, - oldDoc.value(), - args->updatedDoc, - loc, - &keysInserted, - &keysDeleted)); - - if (opDebug) { - opDebug->additiveMetrics.incrementKeysInserted(keysInserted); - opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted); - // 'opDebug' may be deleted at rollback time in case of multi-document transaction. - if (!opCtx->inMultiDocumentTransaction()) { - opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() { - opDebug->additiveMetrics.incrementKeysInserted(-keysInserted); - opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted); - }); - } - } - } - - opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs); - return newDoc; -} - bool CollectionImpl::isTemporary() const { return _metadata->options.temp; } diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index 409d570c8c6..46c2a3bb6be 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -191,40 +191,8 @@ public: Collection::StoreDeletedDoc storeDeletedDoc = Collection::StoreDeletedDoc::Off, CheckRecordId checkRecordId = CheckRecordId::Off) const final; - /** - * Updates the document @ oldLocation with newDoc. - * - * If the document fits in the old space, it is put there; if not, it is moved. - * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on - * success. - * 'opDebug' Optional argument. When not null, will be used to record operation statistics. - * @return the post update location of the doc (may or may not be the same as oldLocation) - */ - RecordId updateDocument(OperationContext* opCtx, - const RecordId& oldLocation, - const Snapshotted<BSONObj>& oldDoc, - const BSONObj& newDoc, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const final; - bool updateWithDamagesSupported() const final; - /** - * Illegal to call if updateWithDamagesSupported() returns false. - * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on - * success. - * Returns the contents of the updated document. - */ - StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx, - const RecordId& loc, - const Snapshotted<BSONObj>& oldDoc, - const char* damageSource, - const mutablebson::DamageVector& damages, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const final; - // ----------- /** diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index 62d5bf82df6..7ff9e7d00d8 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -160,31 +160,10 @@ public: MONGO_UNREACHABLE; } - RecordId updateDocument(OperationContext* opCtx, - const RecordId& oldLocation, - const Snapshotted<BSONObj>& oldDoc, - const BSONObj& newDoc, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const { - MONGO_UNREACHABLE; - } - bool updateWithDamagesSupported() const { MONGO_UNREACHABLE; } - StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx, - const RecordId& loc, - const Snapshotted<BSONObj>& oldDoc, - const char* damageSource, - const mutablebson::DamageVector& damages, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const { - MONGO_UNREACHABLE; - } - Status truncate(OperationContext* opCtx) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/catalog/collection_test.cpp b/src/mongo/db/catalog/collection_test.cpp index 5957c74b53c..f03632ceaab 100644 --- a/src/mongo/db/catalog/collection_test.cpp +++ b/src/mongo/db/catalog/collection_test.cpp @@ -278,7 +278,8 @@ TEST_F(CollectionTest, VerifyIndexIsUpdated) { WriteUnitOfWork wuow(opCtx); Snapshotted<BSONObj> oldSnap(opCtx->recoveryUnit()->getSnapshotId(), oldDoc); CollectionUpdateArgs args; - coll->updateDocument(opCtx, oldRecordId, oldSnap, newDoc, true, nullptr, &args); + collection_internal::updateDocument( + opCtx, coll, oldRecordId, oldSnap, newDoc, true, nullptr, &args); wuow.commit(); } auto indexRecordId = userIdx->getEntry()->accessMethod()->asSortedData()->findSingle( @@ -321,14 +322,16 @@ TEST_F(CollectionTest, VerifyIndexIsUpdatedWithDamages) { WriteUnitOfWork wuow(opCtx); Snapshotted<BSONObj> oldSnap(opCtx->recoveryUnit()->getSnapshotId(), oldDoc); CollectionUpdateArgs args; - auto newDocStatus = coll->updateDocumentWithDamages(opCtx, - oldRecordId, - oldSnap, - damagesOutput.damageSource.get(), - damagesOutput.damages, - true, - nullptr, - &args); + auto newDocStatus = + collection_internal::updateDocumentWithDamages(opCtx, + coll, + oldRecordId, + oldSnap, + damagesOutput.damageSource.get(), + damagesOutput.damages, + true, + nullptr, + &args); ASSERT_OK(newDocStatus); ASSERT_BSONOBJ_EQ(newDoc, newDocStatus.getValue()); wuow.commit(); diff --git a/src/mongo/db/catalog/collection_write_path.cpp b/src/mongo/db/catalog/collection_write_path.cpp index 69664b71635..38db9f4f22e 100644 --- a/src/mongo/db/catalog/collection_write_path.cpp +++ b/src/mongo/db/catalog/collection_write_path.cpp @@ -29,6 +29,7 @@ #include "mongo/db/catalog/collection_write_path.h" +#include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/crypto/fle_crypto.h" #include "mongo/db/catalog/capped_collection_maintenance.h" #include "mongo/db/catalog/document_validation.h" @@ -68,6 +69,34 @@ MONGO_FAIL_POINT_DEFINE(hangAfterCollectionInserts); // This fail point introduces corruption to documents during insert. MONGO_FAIL_POINT_DEFINE(corruptDocumentOnInsert); +bool compareSafeContentElem(const BSONObj& oldDoc, const BSONObj& newDoc) { + if (newDoc.hasField(kSafeContent) != oldDoc.hasField(kSafeContent)) { + return false; + } + if (!newDoc.hasField(kSafeContent)) { + return true; + } + + return newDoc.getField(kSafeContent).binaryEqual(oldDoc.getField(kSafeContent)); +} + +std::vector<OplogSlot> reserveOplogSlotsForRetryableFindAndModify(OperationContext* opCtx, + const CollectionPtr& collection) { + // For retryable findAndModify running in a multi-document transaction, we will reserve the + // oplog entries when the transaction prepares or commits without prepare. + if (opCtx->inMultiDocumentTransaction()) { + return {}; + } + + // We reserve oplog slots here, expecting the slot with the greatest timestmap (say TS) to be + // used as the oplog timestamp. Tenant migrations and resharding will forge no-op image oplog + // entries and set the timestamp for these synthetic entries to be TS - 1. + auto oplogInfo = LocalOplogInfo::get(opCtx); + auto slots = oplogInfo->getNextOpTimes(opCtx, 2); + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(slots.back().getTimestamp())); + return slots; +} + Status insertDocumentsImpl(OperationContext* opCtx, const CollectionPtr& collection, const std::vector<InsertStatement>::const_iterator begin, @@ -365,5 +394,197 @@ Status checkFailCollectionInsertsFailPoint(const NamespaceString& ns, const BSON return s; } +RecordId updateDocument(OperationContext* opCtx, + const CollectionPtr& collection, + const RecordId& oldLocation, + const Snapshotted<BSONObj>& oldDoc, + const BSONObj& newDoc, + bool indexesAffected, + OpDebug* opDebug, + CollectionUpdateArgs* args) { + { + auto status = collection->checkValidationAndParseResult(opCtx, newDoc); + if (!status.isOK()) { + if (validationLevelOrDefault(collection->getCollectionOptions().validationLevel) == + ValidationLevelEnum::strict) { + uassertStatusOK(status); + } + // moderate means we have to check the old doc + auto oldDocStatus = collection->checkValidationAndParseResult(opCtx, oldDoc.value()); + if (oldDocStatus.isOK()) { + // transitioning from good -> bad is not ok + uassertStatusOK(status); + } + // bad -> bad is ok in moderate mode + } + } + + auto& validationSettings = DocumentValidationSettings::get(opCtx); + if (collection->getCollectionOptions().encryptedFieldConfig && + !validationSettings.isSchemaValidationDisabled() && + !validationSettings.isSafeContentValidationDisabled()) { + + uassert(ErrorCodes::BadValue, + str::stream() << "New document and old document both need to have " << kSafeContent + << " field.", + compareSafeContentElem(oldDoc.value(), newDoc)); + } + + dassert(opCtx->lockState()->isCollectionLockedForMode(collection->ns(), MODE_IX)); + invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId()); + invariant(newDoc.isOwned()); + + if (collection->needsCappedLock()) { + Lock::ResourceLock heldUntilEndOfWUOW{ + opCtx, ResourceId(RESOURCE_METADATA, collection->ns()), MODE_X}; + } + + SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId(); + + BSONElement oldId = oldDoc.value()["_id"]; + if (!oldId.eoo() && SimpleBSONElementComparator::kInstance.evaluate(oldId != newDoc["_id"])) + uasserted(13596, "in Collection::updateDocument _id mismatch"); + + // The preImageDoc may not be boost::none if this update was a retryable findAndModify or if + // the update may have changed the shard key. For non-in-place updates we always set the + // preImageDoc here to an owned copy of the pre-image. + if (!args->preImageDoc) { + args->preImageDoc = oldDoc.value().getOwned(); + } + args->changeStreamPreAndPostImagesEnabledForCollection = + collection->isChangeStreamPreAndPostImagesEnabled(); + + OplogUpdateEntryArgs onUpdateArgs(args, collection); + const bool setNeedsRetryImageOplogField = + args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; + if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && args->retryableWrite) { + onUpdateArgs.retryableFindAndModifyLocation = + RetryableFindAndModifyLocation::kSideCollection; + // If the update is part of a retryable write and we expect to be storing the pre- or + // post-image in a side collection, then we must reserve oplog slots in advance. We + // expect to use the reserved oplog slots as follows, where TS is the greatest + // timestamp of 'oplogSlots': + // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set + // the entry timestamps to TS - 1. + // TS: The timestamp given to the update oplog entry. + args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, collection); + } else { + // Retryable findAndModify commands should not reserve oplog slots before entering this + // function since tenant migrations and resharding rely on always being able to set + // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1. + invariant(!(args->retryableWrite && setNeedsRetryImageOplogField)); + } + + uassertStatusOK(collection->getRecordStore()->updateRecord( + opCtx, oldLocation, newDoc.objdata(), newDoc.objsize())); + + if (indexesAffected) { + int64_t keysInserted = 0; + int64_t keysDeleted = 0; + + uassertStatusOK(collection->getIndexCatalog()->updateRecord(opCtx, + collection, + *args->preImageDoc, + newDoc, + oldLocation, + &keysInserted, + &keysDeleted)); + + if (opDebug) { + opDebug->additiveMetrics.incrementKeysInserted(keysInserted); + opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted); + // 'opDebug' may be deleted at rollback time in case of multi-document transaction. + if (!opCtx->inMultiDocumentTransaction()) { + opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() { + opDebug->additiveMetrics.incrementKeysInserted(-keysInserted); + opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted); + }); + } + } + } + + invariant(sid == opCtx->recoveryUnit()->getSnapshotId()); + args->updatedDoc = newDoc; + + opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs); + + return oldLocation; +} + +StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx, + const CollectionPtr& collection, + const RecordId& loc, + const Snapshotted<BSONObj>& oldDoc, + const char* damageSource, + const mutablebson::DamageVector& damages, + bool indexesAffected, + OpDebug* opDebug, + CollectionUpdateArgs* args) { + dassert(opCtx->lockState()->isCollectionLockedForMode(collection->ns(), MODE_IX)); + invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId()); + invariant(collection->updateWithDamagesSupported()); + + // For in-place updates we need to grab an owned copy of the pre-image doc if pre-image + // recording is enabled and we haven't already set the pre-image due to this update being + // a retryable findAndModify or a possible update to the shard key. + if (!args->preImageDoc && collection->isChangeStreamPreAndPostImagesEnabled()) { + args->preImageDoc = oldDoc.value().getOwned(); + } + OplogUpdateEntryArgs onUpdateArgs(args, collection); + const bool setNeedsRetryImageOplogField = + args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; + if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && args->retryableWrite) { + onUpdateArgs.retryableFindAndModifyLocation = + RetryableFindAndModifyLocation::kSideCollection; + // If the update is part of a retryable write and we expect to be storing the pre- or + // post-image in a side collection, then we must reserve oplog slots in advance. We + // expect to use the reserved oplog slots as follows, where TS is the greatest + // timestamp of 'oplogSlots': + // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set + // the entry timestamps to TS - 1. + // TS: The timestamp given to the update oplog entry. + args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, collection); + } else { + // Retryable findAndModify commands should not reserve oplog slots before entering this + // function since tenant migrations and resharding rely on always being able to set + // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1. + invariant(!(args->retryableWrite && setNeedsRetryImageOplogField)); + } + + RecordData oldRecordData(oldDoc.value().objdata(), oldDoc.value().objsize()); + StatusWith<RecordData> recordData = collection->getRecordStore()->updateWithDamages( + opCtx, loc, oldRecordData, damageSource, damages); + if (!recordData.isOK()) + return recordData.getStatus(); + BSONObj newDoc = std::move(recordData.getValue()).releaseToBson().getOwned(); + + args->updatedDoc = newDoc; + args->changeStreamPreAndPostImagesEnabledForCollection = + collection->isChangeStreamPreAndPostImagesEnabled(); + + if (indexesAffected) { + int64_t keysInserted = 0; + int64_t keysDeleted = 0; + + uassertStatusOK(collection->getIndexCatalog()->updateRecord( + opCtx, collection, oldDoc.value(), args->updatedDoc, loc, &keysInserted, &keysDeleted)); + + if (opDebug) { + opDebug->additiveMetrics.incrementKeysInserted(keysInserted); + opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted); + // 'opDebug' may be deleted at rollback time in case of multi-document transaction. + if (!opCtx->inMultiDocumentTransaction()) { + opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() { + opDebug->additiveMetrics.incrementKeysInserted(-keysInserted); + opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted); + }); + } + } + } + + opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs); + return newDoc; +} + } // namespace collection_internal } // namespace mongo diff --git a/src/mongo/db/catalog/collection_write_path.h b/src/mongo/db/catalog/collection_write_path.h index 084c04893ab..9e0af1f0273 100644 --- a/src/mongo/db/catalog/collection_write_path.h +++ b/src/mongo/db/catalog/collection_write_path.h @@ -85,5 +85,39 @@ Status insertDocument(OperationContext* opCtx, */ Status checkFailCollectionInsertsFailPoint(const NamespaceString& ns, const BSONObj& firstDoc); +/** + * Updates the document @ oldLocation with newDoc. + * + * If the document fits in the old space, it is put there; if not, it is moved. + * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on + * success. + * 'opDebug' Optional argument. When not null, will be used to record operation statistics. + * @return the post update location of the doc (may or may not be the same as oldLocation) + */ +RecordId updateDocument(OperationContext* opCtx, + const CollectionPtr& collection, + const RecordId& oldLocation, + const Snapshotted<BSONObj>& oldDoc, + const BSONObj& newDoc, + bool indexesAffected, + OpDebug* opDebug, + CollectionUpdateArgs* args); + +/** + * Illegal to call if collection->updateWithDamagesSupported() returns false. + * Sets 'args.updatedDoc' to the updated version of the document with damages applied, on + * success. + * Returns the contents of the updated document. + */ +StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx, + const CollectionPtr& collection, + const RecordId& loc, + const Snapshotted<BSONObj>& oldDoc, + const char* damageSource, + const mutablebson::DamageVector& damages, + bool indexesAffected, + OpDebug* opDebug, + CollectionUpdateArgs* args); + } // namespace collection_internal } // namespace mongo diff --git a/src/mongo/db/catalog/views_for_database.cpp b/src/mongo/db/catalog/views_for_database.cpp index 8ceda406315..23ea8df9c23 100644 --- a/src/mongo/db/catalog/views_for_database.cpp +++ b/src/mongo/db/catalog/views_for_database.cpp @@ -327,13 +327,14 @@ Status ViewsForDatabase::_upsertIntoCatalog(OperationContext* opCtx, args.update = viewObj; args.criteria = BSON("_id" << NamespaceStringUtil::serialize(view.name())); - systemViews->updateDocument(opCtx, - id, - oldView, - viewObj, - true /* indexesAffected */, - &CurOp::get(opCtx)->debug(), - &args); + collection_internal::updateDocument(opCtx, + systemViews, + id, + oldView, + viewObj, + true /* indexesAffected */, + &CurOp::get(opCtx)->debug(), + &args); } return Status::OK(); diff --git a/src/mongo/db/catalog/virtual_collection_impl.h b/src/mongo/db/catalog/virtual_collection_impl.h index d9596eb8ca2..a490f4f9ee1 100644 --- a/src/mongo/db/catalog/virtual_collection_impl.h +++ b/src/mongo/db/catalog/virtual_collection_impl.h @@ -177,34 +177,11 @@ public: unimplementedTasserted(); } - RecordId updateDocument(OperationContext* opCtx, - const RecordId& oldLocation, - const Snapshotted<BSONObj>& oldDoc, - const BSONObj& newDoc, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const final { - unimplementedTasserted(); - return RecordId(); - } - bool updateWithDamagesSupported() const final { unimplementedTasserted(); return false; } - StatusWith<BSONObj> updateDocumentWithDamages(OperationContext* opCtx, - const RecordId& loc, - const Snapshotted<BSONObj>& oldDoc, - const char* damageSource, - const mutablebson::DamageVector& damages, - bool indexesAffected, - OpDebug* opDebug, - CollectionUpdateArgs* args) const final { - unimplementedTasserted(); - return Status(ErrorCodes::UnknownError, "unknown"); - } - Status truncate(OperationContext* opCtx) final { unimplementedTasserted(); return Status(ErrorCodes::UnknownError, "unknown"); diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index dd00b6366e9..154da0a09c7 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -33,6 +33,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/catalog/clustered_collection_util.h" +#include "mongo/db/catalog/collection_write_path.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/btree_access_method.h" @@ -386,7 +387,8 @@ bool Helpers::findByIdAndNoopUpdate(OperationContext* opCtx, CollectionUpdateArgs args; args.criteria = idQuery; args.update = BSONObj(); - collection->updateDocument(opCtx, recordId, snapshottedDoc, result, false, nullptr, &args); + collection_internal::updateDocument( + opCtx, collection, recordId, snapshottedDoc, result, false, nullptr, &args); return true; } diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 63bf1087791..ca7d4fb5ef5 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -33,6 +33,7 @@ #include "mongo/base/status_with.h" #include "mongo/bson/mutable/algorithm.h" +#include "mongo/db/catalog/collection_write_path.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/query/collection_query_info.h" @@ -260,6 +261,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, // Ensure we set the type correctly args.source = writeToOrphan ? OperationSource::kFromMigrate : request->source(); + args.retryableWrite = write_stage_common::isRetryableWrite(opCtx()); + if (inPlace) { if (!request->explain()) { newObj = oldObj.value(); @@ -274,14 +277,15 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, WriteUnitOfWork wunit(opCtx()); newObj = uassertStatusOK( - collection()->updateDocumentWithDamages(opCtx(), - recordId, - oldObj, - source, - _damages, - driver->modsAffectIndices(), - _params.opDebug, - &args)); + collection_internal::updateDocumentWithDamages(opCtx(), + collection(), + recordId, + oldObj, + source, + _damages, + driver->modsAffectIndices(), + _params.opDebug, + &args)); invariant(oldObj.snapshotId() == opCtx()->recoveryUnit()->getSnapshotId()); wunit.commit(); } @@ -305,13 +309,14 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, } WriteUnitOfWork wunit(opCtx()); - newRecordId = collection()->updateDocument(opCtx(), - recordId, - oldObj, - newObj, - driver->modsAffectIndices(), - _params.opDebug, - &args); + newRecordId = collection_internal::updateDocument(opCtx(), + collection(), + recordId, + oldObj, + newObj, + driver->modsAffectIndices(), + _params.opDebug, + &args); invariant(oldObj.snapshotId() == opCtx()->recoveryUnit()->getSnapshotId()); wunit.commit(); } diff --git a/src/mongo/db/exec/write_stage_common.cpp b/src/mongo/db/exec/write_stage_common.cpp index 1fa04f729b2..46f7652280b 100644 --- a/src/mongo/db/exec/write_stage_common.cpp +++ b/src/mongo/db/exec/write_stage_common.cpp @@ -29,6 +29,7 @@ #include "mongo/db/exec/write_stage_common.h" +#include "mongo/base/shim.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/exec/shard_filterer_impl.h" #include "mongo/db/exec/working_set.h" @@ -38,6 +39,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/transaction/transaction_participant.h" #include "mongo/logv2/redaction.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kWrite @@ -133,5 +135,10 @@ bool ensureStillMatches(const CollectionPtr& collection, return true; } +bool isRetryableWrite(OperationContext* opCtx) { + static auto w = MONGO_WEAK_FUNCTION_DEFINITION(write_stage_common::isRetryableWrite); + return w(opCtx); +} + } // namespace write_stage_common } // namespace mongo diff --git a/src/mongo/db/exec/write_stage_common.h b/src/mongo/db/exec/write_stage_common.h index 870178f67c7..2f4a49ac8a9 100644 --- a/src/mongo/db/exec/write_stage_common.h +++ b/src/mongo/db/exec/write_stage_common.h @@ -96,5 +96,11 @@ bool ensureStillMatches(const CollectionPtr& collection, WorkingSet* ws, WorkingSetID id, const CanonicalQuery* cq); + +/** + * Returns true if we are running retryable write or retryable internal multi-document transaction. + */ +bool isRetryableWrite(OperationContext* opCtx); + } // namespace write_stage_common } // namespace mongo diff --git a/src/mongo/db/free_mon/free_mon_op_observer.cpp b/src/mongo/db/free_mon/free_mon_op_observer.cpp index 75db3aeb817..0729e33b84a 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.cpp +++ b/src/mongo/db/free_mon/free_mon_op_observer.cpp @@ -100,7 +100,7 @@ void FreeMonOpObserver::onInserts(OperationContext* opCtx, } void FreeMonOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss != NamespaceString::kServerConfigurationNamespace) { + if (args.coll->ns() != NamespaceString::kServerConfigurationNamespace) { return; } diff --git a/src/mongo/db/op_observer/fcv_op_observer.cpp b/src/mongo/db/op_observer/fcv_op_observer.cpp index 5acca8bc3ae..763b8dcdd92 100644 --- a/src/mongo/db/op_observer/fcv_op_observer.cpp +++ b/src/mongo/db/op_observer/fcv_op_observer.cpp @@ -163,7 +163,7 @@ void FcvOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs if (args.updateArgs->update.isEmpty()) { return; } - if (args.nss.isServerConfigurationCollection()) { + if (args.coll->ns().isServerConfigurationCollection()) { _onInsertOrUpdate(opCtx, args.updateArgs->updatedDoc); } } diff --git a/src/mongo/db/op_observer/op_observer.h b/src/mongo/db/op_observer/op_observer.h index fa770ac62a9..8f57e825ea5 100644 --- a/src/mongo/db/op_observer/op_observer.h +++ b/src/mongo/db/op_observer/op_observer.h @@ -62,15 +62,14 @@ enum class RetryableFindAndModifyLocation { struct OplogUpdateEntryArgs { CollectionUpdateArgs* updateArgs; - NamespaceString nss; - UUID uuid; + const CollectionPtr& coll; // Specifies the pre-image recording option for retryable "findAndModify" commands. RetryableFindAndModifyLocation retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kNone; - OplogUpdateEntryArgs(CollectionUpdateArgs* updateArgs, NamespaceString nss, UUID uuid) - : updateArgs(updateArgs), nss(std::move(nss)), uuid(std::move(uuid)) {} + OplogUpdateEntryArgs(CollectionUpdateArgs* updateArgs, const CollectionPtr& coll) + : updateArgs(updateArgs), coll(coll) {} }; struct OplogDeleteEntryArgs { diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index f80ab67c3c5..5de4209759d 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -196,9 +196,9 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args, MutableOplogEntry* oplogEntry, OplogWriter* oplogWriter) { - oplogEntry->setTid(args.nss.tenantId()); - oplogEntry->setNss(args.nss); - oplogEntry->setUuid(args.uuid); + oplogEntry->setTid(args.coll->ns().tenantId()); + oplogEntry->setNss(args.coll->ns()); + oplogEntry->setUuid(args.coll->uuid()); repl::OplogLink oplogLink; oplogWriter->appendOplogEntryChainInfo(opCtx, oplogEntry, &oplogLink, args.updateArgs->stmtIds); @@ -779,14 +779,15 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg failCollectionUpdates.executeIf( [&](const BSONObj&) { uasserted(40654, - str::stream() << "failCollectionUpdates failpoint enabled, namespace: " - << args.nss.ns() << ", update: " << args.updateArgs->update - << " on document with " << args.updateArgs->criteria); + str::stream() + << "failCollectionUpdates failpoint enabled, namespace: " + << args.coll->ns().ns() << ", update: " << args.updateArgs->update + << " on document with " << args.updateArgs->criteria); }, [&](const BSONObj& data) { // If the failpoint specifies no collection or matches the existing one, fail. auto collElem = data["collectionNS"]; - return !collElem || args.nss.ns() == collElem.String(); + return !collElem || args.coll->ns().ns() == collElem.String(); }); // Do not log a no-op operation; see SERVER-21738 @@ -798,7 +799,8 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg const bool inMultiDocumentTransaction = txnParticipant && opCtx->writesAreReplicated() && txnParticipant.transactionIsOpen(); - ShardingWriteRouter shardingWriteRouter(opCtx, args.nss, Grid::get(opCtx)->catalogCache()); + ShardingWriteRouter shardingWriteRouter( + opCtx, args.coll->ns(), Grid::get(opCtx)->catalogCache()); OpTimeBundle opTime; auto& batchedWriteContext = BatchedWriteContext::get(opCtx); @@ -806,7 +808,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg if (inBatchedWrite) { auto operation = MutableOplogEntry::makeUpdateOperation( - args.nss, args.uuid, args.updateArgs->update, args.updateArgs->criteria); + args.coll->ns(), args.coll->uuid(), args.updateArgs->update, args.updateArgs->criteria); operation.setDestinedRecipient( shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc)); operation.setFromMigrateIfTrue(args.updateArgs->source == OperationSource::kFromMigrate); @@ -816,7 +818,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); auto operation = MutableOplogEntry::makeUpdateOperation( - args.nss, args.uuid, args.updateArgs->update, args.updateArgs->criteria); + args.coll->ns(), args.coll->uuid(), args.updateArgs->update, args.updateArgs->criteria); if (inRetryableInternalTransaction) { operation.setInitializedStatementIds(args.updateArgs->stmtIds); @@ -900,15 +902,15 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg if (args.updateArgs->changeStreamPreAndPostImagesEnabledForCollection && !opTime.writeOpTime.isNull() && args.updateArgs->source != OperationSource::kFromMigrate && - !args.nss.isTemporaryReshardingCollection()) { + !args.coll->ns().isTemporaryReshardingCollection()) { const auto& preImageDoc = args.updateArgs->preImageDoc; tassert(5868600, "PreImage must be set", preImageDoc && !preImageDoc.value().isEmpty()); - ChangeStreamPreImageId id(args.uuid, opTime.writeOpTime.getTimestamp(), 0); + ChangeStreamPreImageId id(args.coll->uuid(), opTime.writeOpTime.getTimestamp(), 0); ChangeStreamPreImage preImage(id, opTime.wallClockTime, preImageDoc.value()); ChangeStreamPreImagesCollectionManager::insertPreImage( - opCtx, args.nss.tenantId(), preImage); + opCtx, args.coll->ns().tenantId(), preImage); } SessionTxnRecord sessionTxnRecord; @@ -917,10 +919,10 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg onWriteOpCompleted(opCtx, args.updateArgs->stmtIds, sessionTxnRecord); } - if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) { + if (args.coll->ns() != NamespaceString::kSessionTransactionsTableNamespace) { if (args.updateArgs->source != OperationSource::kFromMigrate) { shardObserveUpdateOp(opCtx, - args.nss, + args.coll->ns(), args.updateArgs->preImageDoc, args.updateArgs->updatedDoc, opTime.writeOpTime, @@ -930,19 +932,19 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } } - if (args.nss.coll() == "system.js") { + if (args.coll->ns().coll() == "system.js") { Scope::storedFuncMod(opCtx); - } else if (args.nss.isSystemDotViews()) { - CollectionCatalog::get(opCtx)->reloadViews(opCtx, args.nss.dbName()).ignore(); - } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && + } else if (args.coll->ns().isSystemDotViews()) { + CollectionCatalog::get(opCtx)->reloadViews(opCtx, args.coll->ns().dbName()).ignore(); + } else if (args.coll->ns() == NamespaceString::kSessionTransactionsTableNamespace && !opTime.writeOpTime.isNull()) { auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx); mongoDSessionCatalog->observeDirectWriteToConfigTransactions(opCtx, args.updateArgs->updatedDoc); - } else if (args.nss == NamespaceString::kConfigSettingsNamespace) { + } else if (args.coll->ns() == NamespaceString::kConfigSettingsNamespace) { ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( opCtx, args.updateArgs->updatedDoc["_id"], args.updateArgs->updatedDoc); - } else if (args.nss.isTimeseriesBucketsCollection()) { + } else if (args.coll->ns().isTimeseriesBucketsCollection()) { if (args.updateArgs->source != OperationSource::kTimeseriesInsert) { auto& bucketCatalog = BucketCatalog::get(opCtx); bucketCatalog.clear(args.updateArgs->updatedDoc["_id"].OID()); diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp index 09c580d01d6..b451afe1c9c 100644 --- a/src/mongo/db/op_observer/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp @@ -160,7 +160,7 @@ class OpObserverTest : public ServiceContextMongoDTest { protected: explicit OpObserverTest(Options options = {}) : ServiceContextMongoDTest(std::move(options)) {} - void setUp() override { + virtual void setUp() override { // Set up mongod. ServiceContextMongoDTest::setUp(); @@ -223,6 +223,7 @@ protected: } void resetOplogAndTransactions(OperationContext* opCtx) const { + reset(opCtx, nss); reset(opCtx, NamespaceString::kRsOplogNamespace); reset(opCtx, NamespaceString::kSessionTransactionsTableNamespace); reset(opCtx, NamespaceString::kConfigImagesNamespace); @@ -915,11 +916,12 @@ TEST_F(OpObserverTest, SingleStatementUpdateTestIncludesTenantId) { updateArgs.update = BSON("$set" << BSON("data" << "x")); updateArgs.criteria = BSON("_id" << 0); - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); auto opCtx = cc().makeOperationContext(); WriteUnitOfWork wuow(opCtx.get()); AutoGetDb autoDb(opCtx.get(), nss.dbName(), MODE_X); + AutoGetCollection autoColl(opCtx.get(), nss, MODE_X); + OplogUpdateEntryArgs update(&updateArgs, *autoColl); OpObserverRegistry opObserver; opObserver.addObserver(std::make_unique<OpObserverImpl>(std::make_unique<OplogWriterImpl>())); @@ -1089,8 +1091,6 @@ DEATH_TEST_REGEX_F(OpObserverTest, class OpObserverTxnParticipantTest : public OpObserverTest { public: void setUp() override { - OpObserverTest::setUp(); - _opCtx = cc().makeOperationContext(); _opObserver.emplace(std::make_unique<OplogWriterImpl>()); _times.emplace(opCtx()); @@ -1179,6 +1179,7 @@ private: class OpObserverTransactionTest : public OpObserverTxnParticipantTest { protected: void setUp() override { + OpObserverTest::setUp(); OpObserverTxnParticipantTest::setUp(); OpObserverTxnParticipantTest::setUpNonRetryableTransaction(); } @@ -1276,7 +1277,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { updateArgs2.update = BSON("$set" << BSON("data" << "y")); updateArgs2.criteria = BSON("_id" << 0); - OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2); + OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2); opObserver().onUpdate(opCtx(), update2); opObserver().aboutToDelete(opCtx(), @@ -1791,6 +1792,10 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "update"); + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); + CollectionUpdateArgs updateArgs1; updateArgs1.stmtIds = {0}; updateArgs1.updatedDoc = BSON("_id" << 0 << "data" @@ -1798,7 +1803,7 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { updateArgs1.update = BSON("$set" << BSON("data" << "x")); updateArgs1.criteria = BSON("_id" << 0); - OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1); + OplogUpdateEntryArgs update1(&updateArgs1, *autoColl1); CollectionUpdateArgs updateArgs2; updateArgs2.stmtIds = {1}; @@ -1807,11 +1812,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { updateArgs2.update = BSON("$set" << BSON("data" << "y")); updateArgs2.criteria = BSON("_id" << 1); - OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2); + OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2); - WriteUnitOfWork wuow(opCtx()); - AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); - AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); @@ -1845,6 +1847,10 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "update"); + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); + CollectionUpdateArgs updateArgs1; updateArgs1.stmtIds = {0}; updateArgs1.updatedDoc = BSON("_id" << 0 << "data" @@ -1852,7 +1858,7 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) { updateArgs1.update = BSON("$set" << BSON("data" << "x")); updateArgs1.criteria = BSON("_id" << 0); - OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1); + OplogUpdateEntryArgs update1(&updateArgs1, *autoColl1); CollectionUpdateArgs updateArgs2; updateArgs2.stmtIds = {1}; @@ -1861,11 +1867,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) { updateArgs2.update = BSON("$set" << BSON("data" << "y")); updateArgs2.criteria = BSON("_id" << 1); - OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2); + OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2); - WriteUnitOfWork wuow(opCtx()); - AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); - AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); @@ -2029,11 +2032,14 @@ public: OpObserverTxnParticipantTest::tearDown(); } + void setUp() override { + OpObserverTest::setUp(); + auto opCtx = cc().makeOperationContext(); + reset(opCtx.get(), nss, uuid); + } + protected: void testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage() { - NamespaceString nss = {boost::none, "test", "coll"}; - const auto uuid = UUID::gen(); - CollectionUpdateArgs updateArgs; updateArgs.stmtIds = {0}; updateArgs.updatedDoc = BSON("_id" << 0 << "data" @@ -2042,10 +2048,11 @@ protected: << "x")); updateArgs.criteria = BSON("_id" << 0); updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage; - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); + + AutoGetCollection autoColl(opCtx(), nss, MODE_IX); + OplogUpdateEntryArgs update(&updateArgs, *autoColl); update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - AutoGetDb autoDb(opCtx(), nss.dbName(), MODE_X); opObserver().onUpdate(opCtx(), update); commit(); @@ -2062,9 +2069,6 @@ protected: } void testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage() { - NamespaceString nss = {boost::none, "test", "coll"}; - const auto uuid = UUID::gen(); - CollectionUpdateArgs updateArgs; updateArgs.stmtIds = {0}; updateArgs.preImageDoc = BSON("_id" << 0 << "data" @@ -2073,10 +2077,11 @@ protected: << "x")); updateArgs.criteria = BSON("_id" << 0); updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage; - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); + + AutoGetCollection autoColl(opCtx(), nss, MODE_IX); + OplogUpdateEntryArgs update(&updateArgs, *autoColl); update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - AutoGetDb autoDb(opCtx(), nss.dbName(), MODE_X); opObserver().onUpdate(opCtx(), update); commit(); @@ -2093,8 +2098,7 @@ protected: } void testRetryableFindAndModifyDeleteHasNeedsRetryImage() { - NamespaceString nss = {boost::none, "test", "coll"}; - const auto uuid = UUID::gen(); + AutoGetDb autoDb(opCtx(), nss.dbName(), MODE_X); const auto deletedDoc = BSON("_id" << 0 << "data" @@ -2118,6 +2122,9 @@ protected: finish(); } + NamespaceString nss = {boost::none, "test", "coll"}; + UUID uuid = UUID::gen(); + virtual void commit() = 0; virtual void finish() {} @@ -2129,6 +2136,7 @@ class OpObserverRetryableFindAndModifyOutsideTransactionTest : public OpObserverRetryableFindAndModifyTest { public: void setUp() override { + OpObserverRetryableFindAndModifyTest::setUp(); OpObserverTxnParticipantTest::setUp(); OpObserverTxnParticipantTest::setUpRetryableWrite(); } @@ -2163,6 +2171,7 @@ class OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransacti : public OpObserverRetryableFindAndModifyTest { public: void setUp() override { + OpObserverRetryableFindAndModifyTest::setUp(); OpObserverTxnParticipantTest::setUp(); OpObserverTxnParticipantTest::setUpRetryableInternalTransaction(); } @@ -2199,6 +2208,7 @@ class OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransaction : public OpObserverRetryableFindAndModifyTest { public: void setUp() override { + OpObserverRetryableFindAndModifyTest::setUp(); OpObserverTxnParticipantTest::setUp(); OpObserverTxnParticipantTest::setUpRetryableInternalTransaction(); } @@ -2339,6 +2349,7 @@ protected: WriteUnitOfWork wuow(opCtx); auto reservedSlots = repl::getNextOpTimes(opCtx, 3); update->updateArgs->oplogSlots = reservedSlots; + wuow.commit(); } break; } @@ -2410,6 +2421,12 @@ protected: } } + void setUp() override { + OpObserverTest::setUp(); + auto opCtx = cc().makeOperationContext(); + reset(opCtx.get(), _nss, _uuid); + } + std::vector<UpdateTestCase> _cases = { // Regular updates. {kNonFaM, kChangeStreamImagesDisabled, kNotRetryable, 1}, @@ -2454,12 +2471,11 @@ TEST_F(OnUpdateOutputsTest, TestNonTransactionFundamentalOnUpdateOutputs) { } // Phase 2: Call the code we're testing. - CollectionUpdateArgs updateArgs; - OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid); - initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs); - WriteUnitOfWork wuow(opCtx); AutoGetCollection locks(opCtx, _nss, MODE_IX); + CollectionUpdateArgs updateArgs; + OplogUpdateEntryArgs updateEntryArgs(&updateArgs, *locks); + initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs); opObserver.onUpdate(opCtx, updateEntryArgs); wuow.commit(); @@ -2499,12 +2515,12 @@ TEST_F(OnUpdateOutputsTest, TestFundamentalTransactionOnUpdateOutputs) { } // Phase 2: Call the code we're testing. + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, MODE_IX); CollectionUpdateArgs updateArgs; - OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid); + OplogUpdateEntryArgs updateEntryArgs(&updateArgs, *locks); initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs); - WriteUnitOfWork wuow(opCtx); - AutoGetCollection locks(opCtx, _nss, MODE_IX); opObserver.onUpdate(opCtx, updateEntryArgs); commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver); wuow.commit(); @@ -2827,7 +2843,7 @@ TEST_F(BatchedWriteOutputsTest, TestApplyOpsInsertDeleteUpdate) { collUpdateArgs.update = BSON("fieldToUpdate" << "valueToUpdate"); collUpdateArgs.criteria = BSON("_id" << 2); - auto args = OplogUpdateEntryArgs(&collUpdateArgs, _nss, autoColl->uuid()); + auto args = OplogUpdateEntryArgs(&collUpdateArgs, *autoColl); opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, args); } @@ -2914,7 +2930,7 @@ TEST_F(BatchedWriteOutputsTest, TestApplyOpsInsertDeleteUpdateIncludesTenantId) collUpdateArgs.update = BSON("fieldToUpdate" << "valueToUpdate"); collUpdateArgs.criteria = BSON("_id" << 2); - auto args = OplogUpdateEntryArgs(&collUpdateArgs, autoColl->ns(), autoColl->uuid()); + auto args = OplogUpdateEntryArgs(&collUpdateArgs, *autoColl); opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, args); } @@ -3539,6 +3555,10 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "update"); + WriteUnitOfWork wuow(opCtx()); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); + CollectionUpdateArgs updateArgs1; updateArgs1.stmtIds = {0}; updateArgs1.updatedDoc = BSON("_id" << 0 << "data" @@ -3546,7 +3566,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { updateArgs1.update = BSON("$set" << BSON("data" << "x")); updateArgs1.criteria = BSON("_id" << 0); - OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1); + OplogUpdateEntryArgs update1(&updateArgs1, *autoColl1); CollectionUpdateArgs updateArgs2; updateArgs2.stmtIds = {1}; @@ -3555,11 +3575,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { updateArgs2.update = BSON("$set" << BSON("data" << "y")); updateArgs2.criteria = BSON("_id" << 1); - OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2); + OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2); - WriteUnitOfWork wuow(opCtx()); - AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); - AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); @@ -3734,6 +3751,9 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "update"); + AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); + AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); + CollectionUpdateArgs updateArgs1; updateArgs1.stmtIds = {0}; updateArgs1.updatedDoc = BSON("_id" << 0 << "data" @@ -3741,7 +3761,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { updateArgs1.update = BSON("$set" << BSON("data" << "x")); updateArgs1.criteria = BSON("_id" << 0); - OplogUpdateEntryArgs update1(&updateArgs1, nss1, uuid1); + OplogUpdateEntryArgs update1(&updateArgs1, *autoColl1); CollectionUpdateArgs updateArgs2; updateArgs2.stmtIds = {1}; @@ -3750,10 +3770,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { updateArgs2.update = BSON("$set" << BSON("data" << "y")); updateArgs2.criteria = BSON("_id" << 1); - OplogUpdateEntryArgs update2(&updateArgs2, nss2, uuid2); + OplogUpdateEntryArgs update2(&updateArgs2, *autoColl2); - AutoGetCollection autoColl1(opCtx(), nss1, MODE_IX); - AutoGetCollection autoColl2(opCtx(), nss2, MODE_IX); opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); diff --git a/src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp b/src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp index 93fa938f2be..d1e90077f48 100644 --- a/src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp +++ b/src/mongo/db/op_observer/user_write_block_mode_op_observer.cpp @@ -89,10 +89,10 @@ void UserWriteBlockModeOpObserver::onInserts(OperationContext* opCtx, void UserWriteBlockModeOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { if (args.updateArgs->source != OperationSource::kFromMigrate) { - _checkWriteAllowed(opCtx, args.nss); + _checkWriteAllowed(opCtx, args.coll->ns()); } - if (args.nss == NamespaceString::kUserWritesCriticalSectionsNamespace && + if (args.coll->ns() == NamespaceString::kUserWritesCriticalSectionsNamespace && !user_writes_recoverable_critical_section_util::inRecoveryMode(opCtx)) { const auto collCSDoc = UserWriteBlockingCriticalSectionDocument::parse( IDLParserContext("UserWriteBlockOpObserver"), args.updateArgs->updatedDoc); diff --git a/src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp b/src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp index fff61baf687..baeadb8e394 100644 --- a/src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp +++ b/src/mongo/db/op_observer/user_write_block_mode_op_observer_test.cpp @@ -89,8 +89,7 @@ protected: collectionUpdateArgs.source = fromMigrate ? OperationSource::kFromMigrate : OperationSource::kStandard; auto uuid = UUID::gen(); - OplogUpdateEntryArgs updateArgs(&collectionUpdateArgs, nss, uuid); - updateArgs.nss = nss; + OplogUpdateEntryArgs updateArgs(&collectionUpdateArgs, *autoColl); OplogDeleteEntryArgs deleteArgs; deleteArgs.fromMigrate = fromMigrate; if (shouldSucceed) { diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index f22d3a915e1..f3a37117b00 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -1457,8 +1457,8 @@ Status performAtomicTimeseriesWrites( opCtx->recoveryUnit()->setTimestamp(args.oplogSlots[0].getTimestamp())); } - coll->updateDocument( - opCtx, recordId, original, updated, indexesAffected, &curOp->debug(), &args); + collection_internal::updateDocument( + opCtx, *coll, recordId, original, updated, indexesAffected, &curOp->debug(), &args); if (slot) { if (participant) { // Manually sets the timestamp so that the "prevOpTime" field in the oplog entry is diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 9d87b25ba6d..dd0aac54126 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1494,6 +1494,7 @@ env.Library( 'tenant_migration_utils', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/collection_crud', "$BUILD_DIR/mongo/db/catalog/local_oplog_info", "$BUILD_DIR/mongo/db/concurrency/exception_util", "$BUILD_DIR/mongo/db/index_builds_coordinator_interface", diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp index a24e438feb0..7c4a904b1a5 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp @@ -262,7 +262,7 @@ void OplogApplierImplTest::_testApplyOplogEntryOrGroupedInsertsCrudOperation( _opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs& args) { applyOpCalled = true; checkOpCtx(opCtx); - ASSERT_EQUALS(targetNss, args.nss); + ASSERT_EQUALS(targetNss, args.coll->ns()); return Status::OK(); }; diff --git a/src/mongo/db/repl/storage_timestamp_test.cpp b/src/mongo/db/repl/storage_timestamp_test.cpp index ea6f20de749..28c7b520aba 100644 --- a/src/mongo/db/repl/storage_timestamp_test.cpp +++ b/src/mongo/db/repl/storage_timestamp_test.cpp @@ -3303,14 +3303,16 @@ TEST_F(RetryableFindAndModifyTest, RetryableFindAndModifyUpdate) { args.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage; args.update = BSON("$set" << BSON("b" << 1)); args.criteria = BSON("_id" << 0); + args.retryableWrite = true; { auto cursor = collection->getCursor(_opCtx); auto record = cursor->next(); invariant(record); WriteUnitOfWork wuow(_opCtx); - collection->updateDocument( + collection_internal::updateDocument( _opCtx, + collection.get(), record->id, Snapshotted<BSONObj>(_opCtx->recoveryUnit()->getSnapshotId(), oldObj), newObj, @@ -3359,6 +3361,7 @@ TEST_F(RetryableFindAndModifyTest, RetryableFindAndModifyUpdateWithDamages) { args.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage; args.update = BSON("$set" << BSON("a" << 0)); args.criteria = BSON("_id" << 0); + args.retryableWrite = true; { Snapshotted<BSONObj> objSnapshot(_opCtx->recoveryUnit()->getSnapshotId(), oldObj); @@ -3366,8 +3369,8 @@ TEST_F(RetryableFindAndModifyTest, RetryableFindAndModifyUpdateWithDamages) { auto record = cursor->next(); invariant(record); WriteUnitOfWork wuow(_opCtx); - const auto statusWith = collection->updateDocumentWithDamages( - _opCtx, record->id, objSnapshot, source, damages, false, nullptr, &args); + const auto statusWith = collection_internal::updateDocumentWithDamages( + _opCtx, *autoColl, record->id, objSnapshot, source, damages, false, nullptr, &args); wuow.commit(); ASSERT_OK(statusWith.getStatus()); } diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp index 65f765cf94e..7fb4bf05a29 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.cpp @@ -272,7 +272,7 @@ void TenantMigrationDonorOpObserver::onInserts(OperationContext* opCtx, void TenantMigrationDonorOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss == NamespaceString::kTenantMigrationDonorsNamespace && + if (args.coll->ns() == NamespaceString::kTenantMigrationDonorsNamespace && !tenant_migration_access_blocker::inRecoveryMode(opCtx)) { auto donorStateDoc = tenant_migration_access_blocker::parseDonorStateDocument(args.updateArgs->updatedDoc); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index f1949c83569..9470ac2b036 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -33,6 +33,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/config.h" +#include "mongo/db/catalog/collection_write_path.h" #include "mongo/db/commands/tenant_migration_donor_cmds_gen.h" #include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h" #include "mongo/db/concurrency/exception_util.h" @@ -628,13 +629,14 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState args.oplogSlots = {oplogSlot}; args.update = updatedStateDocBson; - collection->updateDocument(opCtx, - originalRecordId, - originalSnapshot, - updatedStateDocBson, - false, - nullptr /* OpDebug* */, - &args); + collection_internal::updateDocument(opCtx, + *collection, + originalRecordId, + originalSnapshot, + updatedStateDocBson, + false, + nullptr /* OpDebug* */, + &args); wuow.commit(); diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index 19f5f6ab71c..b955988ab2d 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -208,7 +208,7 @@ void TenantMigrationRecipientOpObserver::onInserts( void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss == NamespaceString::kTenantMigrationRecipientsNamespace && + if (args.coll->ns() == NamespaceString::kTenantMigrationRecipientsNamespace && !tenant_migration_access_blocker::inRecoveryMode(opCtx)) { auto recipientStateDoc = TenantMigrationRecipientDocument::parse( IDLParserContext("recipientStateDoc"), args.updateArgs->updatedDoc); diff --git a/src/mongo/db/repl/tenant_oplog_applier_test.cpp b/src/mongo/db/repl/tenant_oplog_applier_test.cpp index f00a1f7efb2..319ef74c58f 100644 --- a/src/mongo/db/repl/tenant_oplog_applier_test.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier_test.cpp @@ -715,8 +715,8 @@ TEST_F(TenantOplogApplierTest, ApplyUpdate_Success) { bool onUpdateCalled = false; _opObserver->onUpdateFn = [&](OperationContext* opCtx, const OplogUpdateEntryArgs& args) { onUpdateCalled = true; - ASSERT_EQUALS(nss, args.nss); - ASSERT_EQUALS(uuid, args.uuid); + ASSERT_EQUALS(nss, args.coll->ns()); + ASSERT_EQUALS(uuid, args.coll->uuid()); }; pushOps({entry}); auto writerPool = makeTenantMigrationWriterPool(); diff --git a/src/mongo/db/s/README.md b/src/mongo/db/s/README.md index 028c09071a1..c5cd8c5d081 100644 --- a/src/mongo/db/s/README.md +++ b/src/mongo/db/s/README.md @@ -22,10 +22,10 @@ corresponding data. The authoritative routing table is stored in a set of unsharded collections in the config database on the config server replica set. The schemas of the relevant collections are: -* [**config.databases**](https://docs.mongodb.com/manual/reference/config-database/#config.databases) -* [**config.collections**](https://docs.mongodb.com/manual/reference/config-database/#config.collections) -* [**config.chunks**](https://docs.mongodb.com/manual/reference/config-database/#config.chunks) -* [**config.shards**](https://docs.mongodb.com/manual/reference/config-database/#config.shards) +* [**config.databases**](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.databases) +* [**config.collections**](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.collections) +* [**config.chunks**](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.chunks) +* [**config.shards**](https://www.mongodb.com/docs/manual/reference/config-database/#mongodb-data-config.shards) #### Code references diff --git a/src/mongo/db/s/config_server_op_observer.cpp b/src/mongo/db/s/config_server_op_observer.cpp index 983251d7d09..b947b0de536 100644 --- a/src/mongo/db/s/config_server_op_observer.cpp +++ b/src/mongo/db/s/config_server_op_observer.cpp @@ -137,7 +137,7 @@ void ConfigServerOpObserver::onInserts(OperationContext* opCtx, } void ConfigServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss != NamespaceString::kConfigsvrShardsNamespace) { + if (args.coll->ns() != NamespaceString::kConfigsvrShardsNamespace) { return; } diff --git a/src/mongo/db/s/query_analysis_op_observer.cpp b/src/mongo/db/s/query_analysis_op_observer.cpp index efdbb36a4ee..84606c7f4c7 100644 --- a/src/mongo/db/s/query_analysis_op_observer.cpp +++ b/src/mongo/db/s/query_analysis_op_observer.cpp @@ -74,13 +74,13 @@ void QueryAnalysisOpObserver::onInserts(OperationContext* opCtx, void QueryAnalysisOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { if (analyze_shard_key::supportsCoordinatingQueryAnalysis()) { - if (args.nss == NamespaceString::kConfigQueryAnalyzersNamespace) { + if (args.coll->ns() == NamespaceString::kConfigQueryAnalyzersNamespace) { const auto& updatedDoc = args.updateArgs->updatedDoc; opCtx->recoveryUnit()->onCommit([opCtx, updatedDoc](boost::optional<Timestamp>) { analyze_shard_key::QueryAnalysisCoordinator::get(opCtx)->onConfigurationUpdate( updatedDoc); }); - } else if (args.nss == MongosType::ConfigNS) { + } else if (args.coll->ns() == MongosType::ConfigNS) { const auto& updatedDoc = args.updateArgs->updatedDoc; opCtx->recoveryUnit()->onCommit([opCtx, updatedDoc](boost::optional<Timestamp>) { analyze_shard_key::QueryAnalysisCoordinator::get(opCtx)->onSamplerUpdate( diff --git a/src/mongo/db/s/range_deleter_service_op_observer.cpp b/src/mongo/db/s/range_deleter_service_op_observer.cpp index 8765dfedbbb..9c1e51f9a6f 100644 --- a/src/mongo/db/s/range_deleter_service_op_observer.cpp +++ b/src/mongo/db/s/range_deleter_service_op_observer.cpp @@ -85,7 +85,7 @@ void RangeDeleterServiceOpObserver::onInserts(OperationContext* opCtx, void RangeDeleterServiceOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss == NamespaceString::kRangeDeletionNamespace) { + if (args.coll->ns() == NamespaceString::kRangeDeletionNamespace) { const bool pendingFieldIsRemoved = [&] { return update_oplog_entry::isFieldRemovedByUpdate( args.updateArgs->update, RangeDeletionTask::kPendingFieldName) == diff --git a/src/mongo/db/s/resharding/resharding_op_observer.cpp b/src/mongo/db/s/resharding/resharding_op_observer.cpp index c34355c2d17..840381f7942 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_op_observer.cpp @@ -206,7 +206,7 @@ void ReshardingOpObserver::onInserts(OperationContext* opCtx, } void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss == NamespaceString::kDonorReshardingOperationsNamespace) { + if (args.coll->ns() == NamespaceString::kDonorReshardingOperationsNamespace) { // Primaries and secondaries should execute pinning logic when observing changes to the // donor resharding document. _doPin(opCtx); @@ -218,7 +218,7 @@ void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEn return; } - if (args.nss == NamespaceString::kConfigReshardingOperationsNamespace) { + if (args.coll->ns() == NamespaceString::kConfigReshardingOperationsNamespace) { auto newCoordinatorDoc = ReshardingCoordinatorDocument::parse( IDLParserContext("reshardingCoordinatorDoc"), args.updateArgs->updatedDoc); opCtx->recoveryUnit()->onCommit([opCtx, newCoordinatorDoc = std::move(newCoordinatorDoc)]( @@ -241,9 +241,10 @@ void ReshardingOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEn "error"_attr = redact(ex.toStatus())); } }); - } else if (args.nss.isTemporaryReshardingCollection()) { + } else if (args.coll->ns().isTemporaryReshardingCollection()) { const std::vector<InsertStatement> updateDoc{InsertStatement{args.updateArgs->updatedDoc}}; - assertCanExtractShardKeyFromDocs(opCtx, args.nss, updateDoc.begin(), updateDoc.end()); + assertCanExtractShardKeyFromDocs( + opCtx, args.coll->ns(), updateDoc.begin(), updateDoc.end()); } } diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.h b/src/mongo/db/s/resharding/resharding_service_test_helpers.h index 70fe8786fc7..73e06640595 100644 --- a/src/mongo/db/s/resharding/resharding_service_test_helpers.h +++ b/src/mongo/db/s/resharding/resharding_service_test_helpers.h @@ -165,7 +165,7 @@ public: } void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override { - if (args.nss != _stateDocumentNss) { + if (args.coll->ns() != _stateDocumentNss) { return; } diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 0df49f52f89..744c79a1eca 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -330,7 +330,8 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE const bool needsSpecialHandling = !updateDoc.isEmpty() && (update_oplog_entry::extractUpdateType(updateDoc) != update_oplog_entry::UpdateType::kReplacement); - if (needsSpecialHandling && args.nss == NamespaceString::kShardConfigCollectionsNamespace) { + if (needsSpecialHandling && + args.coll->ns() == NamespaceString::kShardConfigCollectionsNamespace) { // Notification of routing table changes are only needed on secondaries if (isStandaloneOrPrimary(opCtx)) { return; @@ -383,7 +384,8 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE } } - if (needsSpecialHandling && args.nss == NamespaceString::kShardConfigDatabasesNamespace) { + if (needsSpecialHandling && + args.coll->ns() == NamespaceString::kShardConfigDatabasesNamespace) { // Notification of routing table changes are only needed on secondaries if (isStandaloneOrPrimary(opCtx)) { return; @@ -422,7 +424,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE } } - if (needsSpecialHandling && args.nss == NamespaceString::kRangeDeletionNamespace) { + if (needsSpecialHandling && args.coll->ns() == NamespaceString::kRangeDeletionNamespace) { if (!isStandaloneOrPrimary(opCtx)) return; @@ -442,7 +444,7 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE } } - if (args.nss == NamespaceString::kCollectionCriticalSectionsNamespace && + if (args.coll->ns() == NamespaceString::kCollectionCriticalSectionsNamespace && !recoverable_critical_section_util::inRecoveryMode(opCtx)) { const auto collCSDoc = CollectionCriticalSectionDocument::parse( IDLParserContext("ShardServerOpObserver"), args.updateArgs->updatedDoc); @@ -468,11 +470,11 @@ void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateE } auto metadata = CollectionShardingRuntime::assertCollectionLockedAndAcquire( - opCtx, args.nss, CSRAcquisitionMode::kShared) + opCtx, args.coll->ns(), CSRAcquisitionMode::kShared) ->getCurrentMetadataIfKnown(); if (metadata && metadata->isSharded()) { incrementChunkOnInsertOrUpdate(opCtx, - args.nss, + args.coll->ns(), *metadata->getChunkManager(), args.updateArgs->updatedDoc, args.updateArgs->updatedDoc.objsize(), diff --git a/src/mongo/db/serverless/SConscript b/src/mongo/db/serverless/SConscript index 82143897663..b0751b45f27 100644 --- a/src/mongo/db/serverless/SConscript +++ b/src/mongo/db/serverless/SConscript @@ -83,6 +83,7 @@ env.Library( 'shard_split_state_machine', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/collection_crud', '$BUILD_DIR/mongo/db/catalog/local_oplog_info', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/dbhelpers', diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index fb2d4dd9064..54e33417d40 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -334,7 +334,7 @@ void ShardSplitDonorOpObserver::onInserts(OperationContext* opCtx, void ShardSplitDonorOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss != NamespaceString::kShardSplitDonorsNamespace || + if (args.coll->ns() != NamespaceString::kShardSplitDonorsNamespace || tenant_migration_access_blocker::inRecoveryMode(opCtx)) { return; } diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp index bd1eb42c23b..b644356c301 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp @@ -118,7 +118,8 @@ protected: BSON("$set" << BSON(ShardSplitDonorDocument::kStateFieldName << ShardSplitDonorState_serializer(stateDocument.getState()))); updateArgs.criteria = BSON("_id" << stateDocument.getId()); - OplogUpdateEntryArgs update(&updateArgs, _nss, stateDocument.getId()); + AutoGetCollection autoColl(_opCtx.get(), _nss, MODE_IX); + OplogUpdateEntryArgs update(&updateArgs, *autoColl); WriteUnitOfWork wuow(_opCtx.get()); _observer->onUpdate(_opCtx.get(), update); @@ -340,7 +341,8 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbortingIndexBuildsFail) { BSON("$set" << BSON(ShardSplitDonorDocument::kStateFieldName << ShardSplitDonorState_serializer(stateDocument.getState()))); updateArgs.criteria = BSON("_id" << stateDocument.getId()); - OplogUpdateEntryArgs update(&updateArgs, _nss, stateDocument.getId()); + AutoGetCollection autoColl(_opCtx.get(), _nss, MODE_IX); + OplogUpdateEntryArgs update(&updateArgs, *autoColl); auto update_lambda = [&]() { WriteUnitOfWork wuow(_opCtx.get()); diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 80697cbe0f4..bd715cb167e 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -29,7 +29,9 @@ #include "mongo/db/serverless/shard_split_donor_service.h" + #include "mongo/client/streamable_replica_set_monitor.h" +#include "mongo/db/catalog/collection_write_path.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/dbdirectclient.h" @@ -964,13 +966,14 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS args.oplogSlots = {oplogSlot}; args.update = updatedStateDocBson; - collection->updateDocument(opCtx, - originalRecordId, - originalSnapshot, - updatedStateDocBson, - false, - nullptr /* OpDebug* */, - &args); + collection_internal::updateDocument(opCtx, + *collection, + originalRecordId, + originalSnapshot, + updatedStateDocBson, + false, + nullptr /* OpDebug* */, + &args); return oplogSlot; }(); diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index 2f0c5534dda..58ba1425421 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -34,6 +34,7 @@ #include <fmt/format.h> +#include "mongo/base/shim.h" #include "mongo/db/catalog/collection_write_path.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" @@ -49,6 +50,7 @@ #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/exec/write_stage_common.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/op_observer/op_observer.h" #include "mongo/db/ops/update.h" @@ -451,13 +453,14 @@ void updateSessionEntry(OperationContext* opCtx, // Specify indexesAffected = false because the sessions collection has two indexes: {_id: 1} and // {parentLsid: 1, _id.txnNumber: 1, _id: 1}, and none of the fields are mutable. - collection->updateDocument(opCtx, - recordId, - Snapshotted<BSONObj>(startingSnapshotId, originalDoc), - updateMod, - false, /* indexesAffected */ - nullptr, - &args); + collection_internal::updateDocument(opCtx, + *collection, + recordId, + Snapshotted<BSONObj>(startingSnapshotId, originalDoc), + updateMod, + false, /* indexesAffected */ + nullptr, + &args); wuow.commit(); } @@ -471,6 +474,21 @@ void updateSessionEntry(OperationContext* opCtx, // will be allowed to commit. MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite); +/** + * Returns true if we are running retryable write or retryable internal multi-document transaction. + */ +bool writeStageCommonIsRetryableWriteImpl(OperationContext* opCtx) { + if (!opCtx->writesAreReplicated() || !opCtx->isRetryableWrite()) { + return false; + } + auto txnParticipant = TransactionParticipant::get(opCtx); + return txnParticipant && + (!opCtx->inMultiDocumentTransaction() || txnParticipant.transactionIsOpen()); +} + +auto isRetryableWriteRegistration = MONGO_WEAK_FUNCTION_REGISTRATION( + write_stage_common::isRetryableWrite, writeStageCommonIsRetryableWriteImpl); + } // namespace const BSONObj TransactionParticipant::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); |