diff options
-rw-r--r-- | src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp | 41 |
1 files changed, 28 insertions, 13 deletions
diff --git a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp index 3bdbf3954c5..c63b74721b2 100644 --- a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp @@ -27,38 +27,53 @@ * it in the license file. */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery - #include "mongo/platform/basic.h" #include "mongo/db/pipeline/change_stream_pre_image_helpers.h" +#include "mongo/base/error_codes.h" +#include "mongo/db/catalog/collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/lock_manager_defs.h" #include "mongo/db/concurrency/locker.h" -#include "mongo/db/dbhelpers.h" +#include "mongo/db/curop.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" #include "mongo/util/assert_util.h" +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + namespace mongo { void writeToChangeStreamPreImagesCollection(OperationContext* opCtx, const ChangeStreamPreImage& preImage) { - const auto collectionNamespace = NamespaceString::kChangeStreamPreImagesNamespace; + tassert(6646200, + "Expected to be executed in a write unit of work", + opCtx->lockState()->inAWriteUnitOfWork()); tassert(5869404, str::stream() << "Invalid pre-image document applyOpsIndex: " << preImage.getId().getApplyOpsIndex(), preImage.getId().getApplyOpsIndex() >= 0); - // This lock acquisition can block on a stronger lock held by another operation modifying the - // pre-images collection. There are no known cases where an operation holding an exclusive lock - // on the pre-images collection also waits for oplog visibility. + // This lock acquisition can block on a stronger lock held by another operation modifying + // the pre-images collection. There are no known cases where an operation holding an + // exclusive lock on the pre-images collection also waits for oplog visibility. AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); - AutoGetCollection preimagesCollectionRaii(opCtx, collectionNamespace, LockMode::MODE_IX); - UpdateResult res = Helpers::upsert(opCtx, collectionNamespace.toString(), preImage.toBSON()); + AutoGetCollection preImagesCollectionRaii( + opCtx, NamespaceString::kChangeStreamPreImagesNamespace, LockMode::MODE_IX); + auto& changeStreamPreImagesCollection = preImagesCollectionRaii.getCollection(); + tassert(6646201, + "The change stream pre-images collection is not present", + changeStreamPreImagesCollection); + + // Inserts into the change stream pre-images collection are not replicated. + repl::UnreplicatedWritesBlock unreplicatedWritesBlock{opCtx}; + const auto insertionStatus = changeStreamPreImagesCollection->insertDocument( + opCtx, InsertStatement{preImage.toBSON()}, &CurOp::get(opCtx)->debug()); tassert(5868601, - str::stream() << "Failed to insert a new document into the pre-images collection: ts: " - << preImage.getId().getTs().toString() - << ", applyOpsIndex: " << preImage.getId().getApplyOpsIndex(), - !res.existing && !res.upsertedId.isEmpty()); + str::stream() << "Attempted to insert a duplicate document into the pre-images " + "collection. Pre-image id: " + << preImage.getId().toBSON().toString(), + insertionStatus != ErrorCodes::DuplicateKey); + uassertStatusOK(insertionStatus); } } // namespace mongo |