/** * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/catalog/collection_write_path.h" #include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/crypto/fle_crypto.h" #include "mongo/db/catalog/capped_collection_maintenance.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/local_oplog_info.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/op_observer/op_observer.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage namespace mongo { namespace collection_internal { namespace { // This failpoint throws a WriteConflictException after a successful call to // insertDocumentForBulkLoader MONGO_FAIL_POINT_DEFINE(failAfterBulkLoadDocInsert); // This fail point injects insertion failures for all collections unless a collection name is // provided in the optional data object during configuration: // data: { // collectionNS: , // } MONGO_FAIL_POINT_DEFINE(failCollectionInserts); // Used to pause after inserting collection data and calling the opObservers. Inserts to // replicated collections that are not part of a multi-statement transaction will have generated // their OpTime and oplog entry. Supports parameters to limit pause by namespace and by _id // of first data item in an insert (must be of type string): // data: { // collectionNS: , // first_id: // } MONGO_FAIL_POINT_DEFINE(hangAfterCollectionInserts); // This fail point introduces corruption to documents during insert. MONGO_FAIL_POINT_DEFINE(corruptDocumentOnInsert); bool compareSafeContentElem(const BSONObj& oldDoc, const BSONObj& newDoc) { if (newDoc.hasField(kSafeContent) != oldDoc.hasField(kSafeContent)) { return false; } if (!newDoc.hasField(kSafeContent)) { return true; } return newDoc.getField(kSafeContent).binaryEqual(oldDoc.getField(kSafeContent)); } std::vector reserveOplogSlotsForRetryableFindAndModify(OperationContext* opCtx, const CollectionPtr& collection) { // For retryable findAndModify running in a multi-document transaction, we will reserve the // oplog entries when the transaction prepares or commits without prepare. if (opCtx->inMultiDocumentTransaction()) { return {}; } // We reserve oplog slots here, expecting the slot with the greatest timestmap (say TS) to be // used as the oplog timestamp. Tenant migrations and resharding will forge no-op image oplog // entries and set the timestamp for these synthetic entries to be TS - 1. auto oplogInfo = LocalOplogInfo::get(opCtx); auto slots = oplogInfo->getNextOpTimes(opCtx, 2); uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(slots.back().getTimestamp())); return slots; } Status insertDocumentsImpl(OperationContext* opCtx, const CollectionPtr& collection, const std::vector::const_iterator begin, const std::vector::const_iterator end, OpDebug* opDebug, bool fromMigrate) { const auto& nss = collection->ns(); dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)); const size_t count = std::distance(begin, end); if (collection->isCapped() && collection->getIndexCatalog()->haveAnyIndexes() && count > 1) { // We require that inserts to indexed capped collections be done one-at-a-time to avoid the // possibility that a later document causes an earlier document to be deleted before it can // be indexed. // TODO SERVER-21512 It would be better to handle this here by just doing single inserts. return {ErrorCodes::OperationCannotBeBatched, "Can't batch inserts into indexed capped collections"}; } if (collection->needsCappedLock()) { // X-lock the metadata resource for this replicated, non-clustered capped collection until // the end of the WUOW. Non-clustered capped collections require writes to be serialized on // the secondary in order to guarantee insertion order (SERVER-21483); this exclusive access // to the metadata resource prevents the primary from executing with more concurrency than // secondaries - thus helping secondaries keep up - and protects '_cappedFirstRecord'. See // SERVER-21646. On the other hand, capped clustered collections with a monotonically // increasing cluster key natively guarantee preservation of the insertion order, and don't // need serialisation. We allow concurrent inserts for clustered capped collections. Lock::ResourceLock heldUntilEndOfWUOW{ opCtx, ResourceId(RESOURCE_METADATA, nss.ns()), MODE_X}; } std::vector records; records.reserve(count); std::vector timestamps; timestamps.reserve(count); for (auto it = begin; it != end; it++) { const auto& doc = it->doc; RecordId recordId; if (collection->isClustered()) { invariant(collection->getRecordStore()->keyFormat() == KeyFormat::String); recordId = uassertStatusOK( record_id_helpers::keyForDoc(doc, collection->getClusteredInfo()->getIndexSpec(), collection->getDefaultCollator())); } if (MONGO_unlikely(corruptDocumentOnInsert.shouldFail())) { // Insert a truncated record that is half the expected size of the source document. records.emplace_back( Record{std::move(recordId), RecordData(doc.objdata(), doc.objsize() / 2)}); timestamps.emplace_back(it->oplogSlot.getTimestamp()); continue; } records.emplace_back(Record{std::move(recordId), RecordData(doc.objdata(), doc.objsize())}); timestamps.emplace_back(it->oplogSlot.getTimestamp()); } Status status = collection->getRecordStore()->insertRecords(opCtx, &records, timestamps); if (!status.isOK()) return status; std::vector bsonRecords; bsonRecords.reserve(count); int recordIndex = 0; for (auto it = begin; it != end; it++) { RecordId loc = records[recordIndex++].id; if (collection->getRecordStore()->keyFormat() == KeyFormat::Long) { invariant(RecordId::minLong() < loc); invariant(loc < RecordId::maxLong()); } BsonRecord bsonRecord = { std::move(loc), Timestamp(it->oplogSlot.getTimestamp()), &(it->doc)}; bsonRecords.emplace_back(std::move(bsonRecord)); } int64_t keysInserted = 0; status = collection->getIndexCatalog()->indexRecords(opCtx, collection, bsonRecords, &keysInserted); if (!status.isOK()) { return status; } if (opDebug) { opDebug->additiveMetrics.incrementKeysInserted(keysInserted); // 'opDebug' may be deleted at rollback time in case of multi-document transaction. if (!opCtx->inMultiDocumentTransaction()) { opCtx->recoveryUnit()->onRollback([opDebug, keysInserted]() { opDebug->additiveMetrics.incrementKeysInserted(-keysInserted); }); } } if (!nss.isImplicitlyReplicated()) { opCtx->getServiceContext()->getOpObserver()->onInserts( opCtx, collection, begin, end, fromMigrate); } cappedDeleteUntilBelowConfiguredMaximum(opCtx, collection, records.begin()->id); return Status::OK(); } } // namespace Status insertDocumentForBulkLoader(OperationContext* opCtx, const CollectionPtr& collection, const BSONObj& doc, const OnRecordInsertedFn& onRecordInserted) { const auto& nss = collection->ns(); auto status = checkFailCollectionInsertsFailPoint(nss, doc); if (!status.isOK()) { return status; } status = collection->checkValidationAndParseResult(opCtx, doc); if (!status.isOK()) { return status; } dassert(opCtx->lockState()->isCollectionLockedForMode(nss, MODE_IX)); RecordId recordId; if (collection->isClustered()) { invariant(collection->getRecordStore()->keyFormat() == KeyFormat::String); recordId = uassertStatusOK(record_id_helpers::keyForDoc( doc, collection->getClusteredInfo()->getIndexSpec(), collection->getDefaultCollator())); } // Using timestamp 0 for these inserts, which are non-oplog so we don't have an appropriate // timestamp to use. StatusWith loc = collection->getRecordStore()->insertRecord( opCtx, recordId, doc.objdata(), doc.objsize(), Timestamp()); if (!loc.isOK()) return loc.getStatus(); status = onRecordInserted(loc.getValue()); if (MONGO_unlikely(failAfterBulkLoadDocInsert.shouldFail())) { LOGV2(20290, "Failpoint failAfterBulkLoadDocInsert enabled. Throwing " "WriteConflictException", logAttrs(nss)); throwWriteConflictException(str::stream() << "Hit failpoint '" << failAfterBulkLoadDocInsert.getName() << "'."); } std::vector inserts; OplogSlot slot; // Fetch a new optime now, if necessary. auto replCoord = repl::ReplicationCoordinator::get(opCtx); if (!replCoord->isOplogDisabledFor(opCtx, nss)) { auto slots = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1); slot = slots.back(); } inserts.emplace_back(kUninitializedStmtId, doc, slot); opCtx->getServiceContext()->getOpObserver()->onInserts( opCtx, collection, inserts.begin(), inserts.end(), false); cappedDeleteUntilBelowConfiguredMaximum(opCtx, collection, loc.getValue()); // Capture the recordStore here instead of the CollectionPtr object itself, because the record // store's lifetime is controlled by the collection IX lock held on the write paths, whereas the // CollectionPtr is just a front to the collection and its lifetime is shorter opCtx->recoveryUnit()->onCommit( [recordStore = collection->getRecordStore()](boost::optional) { recordStore->notifyCappedWaitersIfNeeded(); }); return loc.getStatus(); } Status insertDocuments(OperationContext* opCtx, const CollectionPtr& collection, std::vector::const_iterator begin, std::vector::const_iterator end, OpDebug* opDebug, bool fromMigrate) { const auto& nss = collection->ns(); auto status = checkFailCollectionInsertsFailPoint(nss, (begin != end ? begin->doc : BSONObj())); if (!status.isOK()) { return status; } // Should really be done in the collection object at creation and updated on index create. const bool hasIdIndex = collection->getIndexCatalog()->findIdIndex(opCtx); for (auto it = begin; it != end; it++) { if (hasIdIndex && it->doc["_id"].eoo()) { return Status(ErrorCodes::InternalError, str::stream() << "Collection::insertDocument got document without _id for ns:" << nss.toString()); } auto status = collection->checkValidationAndParseResult(opCtx, it->doc); if (!status.isOK()) { return status; } auto& validationSettings = DocumentValidationSettings::get(opCtx); if (collection->getCollectionOptions().encryptedFieldConfig && !validationSettings.isSchemaValidationDisabled() && !validationSettings.isSafeContentValidationDisabled() && it->doc.hasField(kSafeContent)) { return Status(ErrorCodes::BadValue, str::stream() << "Cannot insert a document with field name " << kSafeContent); } } const SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId(); status = insertDocumentsImpl(opCtx, collection, begin, end, opDebug, fromMigrate); if (!status.isOK()) { return status; } invariant(sid == opCtx->recoveryUnit()->getSnapshotId()); // Capture the recordStore here instead of the CollectionPtr object itself, because the record // store's lifetime is controlled by the collection IX lock held on the write paths, whereas the // CollectionPtr is just a front to the collection and its lifetime is shorter opCtx->recoveryUnit()->onCommit( [recordStore = collection->getRecordStore()](boost::optional) { recordStore->notifyCappedWaitersIfNeeded(); }); hangAfterCollectionInserts.executeIf( [&](const BSONObj& data) { const auto& firstIdElem = data["first_id"]; std::string whenFirst; if (firstIdElem) { whenFirst += " when first _id is "; whenFirst += firstIdElem.str(); } LOGV2(20289, "hangAfterCollectionInserts fail point enabled. Blocking " "until fail point is disabled.", "ns"_attr = nss, "whenFirst"_attr = whenFirst); hangAfterCollectionInserts.pauseWhileSet(opCtx); }, [&](const BSONObj& data) { const auto& collElem = data["collectionNS"]; const auto& firstIdElem = data["first_id"]; // If the failpoint specifies no collection or matches the existing one, hang. return (!collElem || nss.ns() == collElem.str()) && (!firstIdElem || (begin != end && firstIdElem.type() == mongo::String && begin->doc["_id"].str() == firstIdElem.str())); }); return Status::OK(); } Status insertDocument(OperationContext* opCtx, const CollectionPtr& collection, const InsertStatement& doc, OpDebug* opDebug, bool fromMigrate) { std::vector docs; docs.push_back(doc); return insertDocuments(opCtx, collection, docs.begin(), docs.end(), opDebug, fromMigrate); } Status checkFailCollectionInsertsFailPoint(const NamespaceString& ns, const BSONObj& firstDoc) { Status s = Status::OK(); failCollectionInserts.executeIf( [&](const BSONObj& data) { const std::string msg = str::stream() << "Failpoint (failCollectionInserts) has been enabled (" << data << "), so rejecting insert (first doc): " << firstDoc; LOGV2(20287, "Failpoint (failCollectionInserts) has been enabled, so rejecting insert", "data"_attr = data, "document"_attr = firstDoc); s = {ErrorCodes::FailPointEnabled, msg}; }, [&](const BSONObj& data) { // If the failpoint specifies no collection or matches the existing one, fail. const auto collElem = data["collectionNS"]; return !collElem || ns.ns() == collElem.str(); }); return s; } RecordId updateDocument(OperationContext* opCtx, const CollectionPtr& collection, const RecordId& oldLocation, const Snapshotted& oldDoc, const BSONObj& newDoc, bool indexesAffected, OpDebug* opDebug, CollectionUpdateArgs* args) { { auto status = collection->checkValidationAndParseResult(opCtx, newDoc); if (!status.isOK()) { if (validationLevelOrDefault(collection->getCollectionOptions().validationLevel) == ValidationLevelEnum::strict) { uassertStatusOK(status); } // moderate means we have to check the old doc auto oldDocStatus = collection->checkValidationAndParseResult(opCtx, oldDoc.value()); if (oldDocStatus.isOK()) { // transitioning from good -> bad is not ok uassertStatusOK(status); } // bad -> bad is ok in moderate mode } } auto& validationSettings = DocumentValidationSettings::get(opCtx); if (collection->getCollectionOptions().encryptedFieldConfig && !validationSettings.isSchemaValidationDisabled() && !validationSettings.isSafeContentValidationDisabled()) { uassert(ErrorCodes::BadValue, str::stream() << "New document and old document both need to have " << kSafeContent << " field.", compareSafeContentElem(oldDoc.value(), newDoc)); } dassert(opCtx->lockState()->isCollectionLockedForMode(collection->ns(), MODE_IX)); invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId()); invariant(newDoc.isOwned()); if (collection->needsCappedLock()) { Lock::ResourceLock heldUntilEndOfWUOW{ opCtx, ResourceId(RESOURCE_METADATA, collection->ns()), MODE_X}; } SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId(); BSONElement oldId = oldDoc.value()["_id"]; if (!oldId.eoo() && SimpleBSONElementComparator::kInstance.evaluate(oldId != newDoc["_id"])) uasserted(13596, "in Collection::updateDocument _id mismatch"); // The preImageDoc may not be boost::none if this update was a retryable findAndModify or if // the update may have changed the shard key. For non-in-place updates we always set the // preImageDoc here to an owned copy of the pre-image. if (!args->preImageDoc) { args->preImageDoc = oldDoc.value().getOwned(); } args->changeStreamPreAndPostImagesEnabledForCollection = collection->isChangeStreamPreAndPostImagesEnabled(); OplogUpdateEntryArgs onUpdateArgs(args, collection); const bool setNeedsRetryImageOplogField = args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && args->retryableWrite) { onUpdateArgs.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; // If the update is part of a retryable write and we expect to be storing the pre- or // post-image in a side collection, then we must reserve oplog slots in advance. We // expect to use the reserved oplog slots as follows, where TS is the greatest // timestamp of 'oplogSlots': // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set // the entry timestamps to TS - 1. // TS: The timestamp given to the update oplog entry. args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, collection); } else { // Retryable findAndModify commands should not reserve oplog slots before entering this // function since tenant migrations and resharding rely on always being able to set // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1. invariant(!(args->retryableWrite && setNeedsRetryImageOplogField)); } uassertStatusOK(collection->getRecordStore()->updateRecord( opCtx, oldLocation, newDoc.objdata(), newDoc.objsize())); if (indexesAffected) { int64_t keysInserted = 0; int64_t keysDeleted = 0; uassertStatusOK(collection->getIndexCatalog()->updateRecord(opCtx, collection, *args->preImageDoc, newDoc, oldLocation, &keysInserted, &keysDeleted)); if (opDebug) { opDebug->additiveMetrics.incrementKeysInserted(keysInserted); opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted); // 'opDebug' may be deleted at rollback time in case of multi-document transaction. if (!opCtx->inMultiDocumentTransaction()) { opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() { opDebug->additiveMetrics.incrementKeysInserted(-keysInserted); opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted); }); } } } invariant(sid == opCtx->recoveryUnit()->getSnapshotId()); args->updatedDoc = newDoc; opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs); return oldLocation; } StatusWith updateDocumentWithDamages(OperationContext* opCtx, const CollectionPtr& collection, const RecordId& loc, const Snapshotted& oldDoc, const char* damageSource, const mutablebson::DamageVector& damages, bool indexesAffected, OpDebug* opDebug, CollectionUpdateArgs* args) { dassert(opCtx->lockState()->isCollectionLockedForMode(collection->ns(), MODE_IX)); invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId()); invariant(collection->updateWithDamagesSupported()); // For in-place updates we need to grab an owned copy of the pre-image doc if pre-image // recording is enabled and we haven't already set the pre-image due to this update being // a retryable findAndModify or a possible update to the shard key. if (!args->preImageDoc && collection->isChangeStreamPreAndPostImagesEnabled()) { args->preImageDoc = oldDoc.value().getOwned(); } OplogUpdateEntryArgs onUpdateArgs(args, collection); const bool setNeedsRetryImageOplogField = args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; if (args->oplogSlots.empty() && setNeedsRetryImageOplogField && args->retryableWrite) { onUpdateArgs.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; // If the update is part of a retryable write and we expect to be storing the pre- or // post-image in a side collection, then we must reserve oplog slots in advance. We // expect to use the reserved oplog slots as follows, where TS is the greatest // timestamp of 'oplogSlots': // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set // the entry timestamps to TS - 1. // TS: The timestamp given to the update oplog entry. args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, collection); } else { // Retryable findAndModify commands should not reserve oplog slots before entering this // function since tenant migrations and resharding rely on always being able to set // timestamps of forged pre- and post- image entries to timestamp of findAndModify - 1. invariant(!(args->retryableWrite && setNeedsRetryImageOplogField)); } RecordData oldRecordData(oldDoc.value().objdata(), oldDoc.value().objsize()); StatusWith recordData = collection->getRecordStore()->updateWithDamages( opCtx, loc, oldRecordData, damageSource, damages); if (!recordData.isOK()) return recordData.getStatus(); BSONObj newDoc = std::move(recordData.getValue()).releaseToBson().getOwned(); args->updatedDoc = newDoc; args->changeStreamPreAndPostImagesEnabledForCollection = collection->isChangeStreamPreAndPostImagesEnabled(); if (indexesAffected) { int64_t keysInserted = 0; int64_t keysDeleted = 0; uassertStatusOK(collection->getIndexCatalog()->updateRecord( opCtx, collection, oldDoc.value(), args->updatedDoc, loc, &keysInserted, &keysDeleted)); if (opDebug) { opDebug->additiveMetrics.incrementKeysInserted(keysInserted); opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted); // 'opDebug' may be deleted at rollback time in case of multi-document transaction. if (!opCtx->inMultiDocumentTransaction()) { opCtx->recoveryUnit()->onRollback([opDebug, keysInserted, keysDeleted]() { opDebug->additiveMetrics.incrementKeysInserted(-keysInserted); opDebug->additiveMetrics.incrementKeysDeleted(-keysDeleted); }); } } } opCtx->getServiceContext()->getOpObserver()->onUpdate(opCtx, onUpdateArgs); return newDoc; } } // namespace collection_internal } // namespace mongo