diff options
author | Jason Chan <jason.chan@mongodb.com> | 2021-07-15 14:30:42 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-23 20:30:54 +0000 |
commit | e7efcb334b6593cdce5cc9765ae702e076827a20 (patch) | |
tree | 0e18dcd066e67ee6ca17d12690ce267e1a9e8bc0 | |
parent | a83aed652d99d3e5c21f2ab83c99b39afd2600c3 (diff) | |
download | mongo-e7efcb334b6593cdce5cc9765ae702e076827a20.tar.gz |
SERVER-58060 Add new internal FindAndModifyImageLookup aggregation stage
4 files changed, 606 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index b4ff7d76fcb..47439f941ec 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -243,6 +243,7 @@ pipelineEnv.Library( 'document_source_densify.cpp', 'document_source_exchange.cpp', 'document_source_facet.cpp', + 'document_source_find_and_modify_image_lookup.cpp', 'document_source_geo_near.cpp', 'document_source_graph_lookup.cpp', 'document_source_group.cpp', @@ -312,6 +313,7 @@ pipelineEnv.Library( '$BUILD_DIR/mongo/db/query/datetime/date_time_support', '$BUILD_DIR/mongo/db/query/query_knobs', '$BUILD_DIR/mongo/db/query/sort_pattern', + '$BUILD_DIR/mongo/db/repl/image_collection_entry', '$BUILD_DIR/mongo/db/repl/oplog_entry', '$BUILD_DIR/mongo/db/repl/read_concern_args', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', @@ -424,6 +426,7 @@ env.CppUnitTest( 'document_source_densify_test.cpp', 'document_source_exchange_test.cpp', 'document_source_facet_test.cpp', + 'document_source_find_and_modify_image_lookup_test.cpp', 'document_source_geo_near_test.cpp', 'document_source_graph_lookup_test.cpp', 'document_source_group_test.cpp', diff --git a/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup.cpp b/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup.cpp new file mode 100644 index 00000000000..84dfd7558b2 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup.cpp @@ -0,0 +1,219 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/pipeline/document_source_find_and_modify_image_lookup.h" +#include "mongo/db/repl/image_collection_entry_gen.h" +#include "mongo/logv2/log.h" + +namespace mongo { +namespace { +// Downconverts a 'findAndModify' entry by stripping the 'needsRetryImage' field and appending +// the appropriate 'preImageOpTime' or 'postImageOpTime' field. +Document downConvertFindAndModifyEntry(Document inputDoc, + repl::OpTime imageOpTime, + repl::RetryImageEnum imageType) { + MutableDocument doc{inputDoc}; + const auto imageOpTimeFieldName = imageType == repl::RetryImageEnum::kPreImage + ? repl::OplogEntry::kPreImageOpTimeFieldName + : repl::OplogEntry::kPostImageOpTimeFieldName; + doc.setField( + imageOpTimeFieldName, + Value{Document{{repl::OpTime::kTimestampFieldName.toString(), imageOpTime.getTimestamp()}, + {repl::OpTime::kTermFieldName.toString(), imageOpTime.getTerm()}}}); + doc.remove(repl::OplogEntryBase::kNeedsRetryImageFieldName); + return doc.freeze(); +} +} // namespace + +using OplogEntry = repl::OplogEntryBase; + +REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalFindAndModifyImageLookup, + LiteParsedDocumentSourceDefault::parse, + DocumentSourceFindAndModifyImageLookup::createFromBson, + true); + +boost::intrusive_ptr<DocumentSourceFindAndModifyImageLookup> +DocumentSourceFindAndModifyImageLookup::create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceFindAndModifyImageLookup(expCtx); +} + +boost::intrusive_ptr<DocumentSourceFindAndModifyImageLookup> +DocumentSourceFindAndModifyImageLookup::createFromBson( + const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { + uassert(5806003, + str::stream() << "the '" << kStageName << "' spec must be an empty object", + elem.type() == BSONType::Object && elem.Obj().isEmpty()); + return DocumentSourceFindAndModifyImageLookup::create(expCtx); +} + +DocumentSourceFindAndModifyImageLookup::DocumentSourceFindAndModifyImageLookup( + const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSource(kStageName, expCtx) {} + +StageConstraints DocumentSourceFindAndModifyImageLookup::constraints( + Pipeline::SplitState pipeState) const { + return StageConstraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kAnyShard, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kNotAllowed, + UnionRequirement::kNotAllowed, + ChangeStreamRequirement::kDenylist); +} + +Value DocumentSourceFindAndModifyImageLookup::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + return Value(Document{{kStageName, Value(Document{})}}); +} + +DepsTracker::State DocumentSourceFindAndModifyImageLookup::getDependencies( + DepsTracker* deps) const { + deps->fields.insert(OplogEntry::kSessionIdFieldName.toString()); + deps->fields.insert(OplogEntry::kTxnNumberFieldName.toString()); + deps->fields.insert(OplogEntry::kNeedsRetryImageFieldName.toString()); + deps->fields.insert(OplogEntry::kWallClockTimeFieldName.toString()); + deps->fields.insert(OplogEntry::kNssFieldName.toString()); + deps->fields.insert(OplogEntry::kTimestampFieldName.toString()); + deps->fields.insert(OplogEntry::kTermFieldName.toString()); + deps->fields.insert(OplogEntry::kUuidFieldName.toString()); + return DepsTracker::State::SEE_NEXT; +} + +DocumentSource::GetModPathsReturn DocumentSourceFindAndModifyImageLookup::getModifiedPaths() const { + return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<std::string>{}, {}}; +} + +DocumentSource::GetNextResult DocumentSourceFindAndModifyImageLookup::doGetNext() { + uassert(5806001, + str::stream() << kStageName << " cannot be executed from mongos", + !pExpCtx->inMongos); + if (_stashedFindAndModifyDoc) { + // Return the stashed findAndModify document. This indicates that the previous document + // returned was a forged noop image document. + auto doc = *_stashedFindAndModifyDoc; + _stashedFindAndModifyDoc = boost::none; + return doc; + } + + auto input = pSource->getNext(); + if (!input.isAdvanced()) { + return input; + } + auto doc = input.releaseDocument(); + if (auto imageEntry = _forgeNoopImageDoc(doc, pExpCtx->opCtx)) { + return std::move(*imageEntry); + } + return doc; +} + +boost::optional<Document> DocumentSourceFindAndModifyImageLookup::_forgeNoopImageDoc( + Document inputDoc, OperationContext* opCtx) { + const auto needsRetryImageVal = + inputDoc.getField(repl::OplogEntryBase::kNeedsRetryImageFieldName); + if (needsRetryImageVal.missing()) { + return boost::none; + } + + const auto inputDocBson = inputDoc.toBson(); + const auto sessionIdBson = inputDocBson.getObjectField(OplogEntry::kSessionIdFieldName); + auto localImageCollInfo = pExpCtx->mongoProcessInterface->getCollectionOptions( + pExpCtx->opCtx, NamespaceString::kConfigImagesNamespace); + + // Extract the UUID from the collection information. We should always have a valid uuid + // here. + auto imageCollUUID = invariantStatusOK(UUID::parse(localImageCollInfo["uuid"])); + const auto& readConcernBson = repl::ReadConcernArgs::get(opCtx).toBSON(); + auto imageDoc = pExpCtx->mongoProcessInterface->lookupSingleDocument( + pExpCtx, + NamespaceString::kConfigImagesNamespace, + imageCollUUID, + Document{BSON("_id" << sessionIdBson)}, + std::move(readConcernBson)); + + if (!imageDoc) { + // If no image document with the corresponding 'sessionId' is found, we skip forging the + // no-op and rely on the retryable write mechanism to catch that no pre- or post- image + // exists. + LOGV2_DEBUG( + 580602, + 2, + "Not forging no-op image oplog entry because no image document found with sessionId", + "sessionId"_attr = sessionIdBson); + return boost::none; + } + + auto image = repl::ImageEntry::parse(IDLParserErrorContext("image entry"), imageDoc->toBson()); + const auto inputOplog = uassertStatusOK(repl::OplogEntry::parse(inputDocBson)); + if (image.getTxnNumber() != inputOplog.getTxnNumber()) { + // In our snapshot, fetch the current transaction number for a session. If that + // transaction number doesn't match what's found on the image lookup, it implies that + // the image is not the correct version for this oplog entry. We will not forge a noop + // from it. + LOGV2_DEBUG( + 580603, + 2, + "Not forging no-op image oplog entry because image document has a different txnNum", + "sessionId"_attr = sessionIdBson, + "expectedTxnNum"_attr = inputOplog.getTxnNumber(), + "actualTxnNum"_attr = image.getTxnNumber()); + return boost::none; + } + + // Stash the 'findAndModify' document to return after downconverting it. + repl::OpTime imageOpTime(inputOplog.getTimestamp(), *inputOplog.getTerm()); + const auto docToStash = + downConvertFindAndModifyEntry(inputDoc, + imageOpTime, + repl::RetryImage_parse(IDLParserErrorContext("retry image"), + needsRetryImageVal.getStringData())); + _stashedFindAndModifyDoc = docToStash; + + // Forge a no-op image document to be returned. + repl::MutableOplogEntry forgedNoop; + forgedNoop.setSessionId(image.get_id()); + forgedNoop.setTxnNumber(image.getTxnNumber()); + forgedNoop.setObject(image.getImage()); + forgedNoop.setOpType(repl::OpTypeEnum::kNoop); + forgedNoop.setWallClockTime(inputOplog.getWallClockTime()); + forgedNoop.setNss(inputOplog.getNss()); + forgedNoop.setUuid(*inputOplog.getUuid()); + + forgedNoop.setOpTime(imageOpTime); + forgedNoop.setStatementIds({0}); + return Document{forgedNoop.toBSON()}; +} +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup.h b/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup.h new file mode 100644 index 00000000000..3452f1deb9c --- /dev/null +++ b/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup.h @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" + +namespace mongo { + +/** + * This stage will take a list of oplog entries as input and forge a no-op pre- or post-image to be + * returned before each 'findAndModify' oplog entry that has the 'needsRetryImage' field. This stage + * also downconverts 'findAndModify' entries by stripping the 'needsRetryImage' field and appending + * the appropriate 'preImageOpTime' or 'postImageOpTime' field. + */ +class DocumentSourceFindAndModifyImageLookup : public DocumentSource { +public: + static constexpr StringData kStageName = "$_internalFindAndModifyImageLookup"_sd; + + static boost::intrusive_ptr<DocumentSourceFindAndModifyImageLookup> create( + const boost::intrusive_ptr<ExpressionContext>&); + + static boost::intrusive_ptr<DocumentSourceFindAndModifyImageLookup> createFromBson( + const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); + + DepsTracker::State getDependencies(DepsTracker* deps) const final; + + DocumentSource::GetModPathsReturn getModifiedPaths() const final; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const; + + StageConstraints constraints(Pipeline::SplitState pipeState) const final; + + boost::optional<DistributedPlanLogic> distributedPlanLogic() final { + return boost::none; + } + + const char* getSourceName() const { + return DocumentSourceFindAndModifyImageLookup::kStageName.rawData(); + } + +protected: + DocumentSource::GetNextResult doGetNext() override; + +private: + DocumentSourceFindAndModifyImageLookup(const boost::intrusive_ptr<ExpressionContext>& expCtx); + + // Forges the no-op pre- or post-image document to be returned. Also downconverts the original + // 'findAndModify' oplog entry and stashes it. + boost::optional<Document> _forgeNoopImageDoc(Document inputDoc, OperationContext* opCtx); + + // Represents the stashed 'findAndModify' document. This indicates that the previous document + // emitted was a forged pre- or post-image. + boost::optional<Document> _stashedFindAndModifyDoc; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup_test.cpp b/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup_test.cpp new file mode 100644 index 00000000000..5ec22e51a4c --- /dev/null +++ b/src/mongo/db/pipeline/document_source_find_and_modify_image_lookup_test.cpp @@ -0,0 +1,301 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include <vector> + +#include "mongo/db/exec/document_value/document.h" +#include "mongo/db/exec/document_value/document_value_test_util.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/matcher/matcher.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_find_and_modify_image_lookup.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h" +#include "mongo/db/repl/image_collection_entry_gen.h" +#include "mongo/logv2/log.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace { + +/** + * Creates OplogEntry with given field values. + */ +repl::OplogEntry makeOplogEntry( + repl::OpTime opTime, + repl::OpTypeEnum opType, + NamespaceString nss, + BSONObj oField, + OperationSessionInfo sessionInfo, + boost::optional<repl::OpTime> preImageOpTime = boost::none, + boost::optional<repl::OpTime> postImageOpTime = boost::none, + boost::optional<repl::RetryImageEnum> needsRetryImage = boost::none) { + static const UUID* uuid = new UUID(UUID::gen()); + return { + repl::DurableOplogEntry(opTime, // optime + boost::none, // hash + opType, // opType + nss, // namespace + *uuid, // uuid + boost::none, // fromMigrate + repl::OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + sessionInfo, // sessionInfo + boost::none, // upsert + Date_t(), // wall clock time + {1}, // statement ids + boost::none, // optime of previous write within same transaction + preImageOpTime, // pre-image optime + postImageOpTime, // post-image optime + boost::none, // ShardId of resharding recipient + boost::none, // _id + needsRetryImage)}; // needsRetryImage +} + +struct MockMongoInterface final : public StubMongoProcessInterface { + MockMongoInterface(std::vector<Document> documentsForLookup = {}) + : _documentsForLookup{std::move(documentsForLookup)} {} + + BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) { + static const UUID* oplog_uuid = new UUID(UUID::gen()); + return BSON("uuid" << *oplog_uuid); + } + + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) final { + Matcher matcher(documentKey.toBson(), expCtx); + auto it = std::find_if(_documentsForLookup.begin(), + _documentsForLookup.end(), + [&](const Document& lookedUpDoc) { + return matcher.matches(lookedUpDoc.toBson(), nullptr); + }); + return (it != _documentsForLookup.end() ? *it : boost::optional<Document>{}); + } + + // These documents are used to feed the 'lookupSingleDocument' method. + std::vector<Document> _documentsForLookup; +}; + +// This provides access to getExpCtx(), but we'll use a different name for this test suite. +using FindAndModifyImageLookupTest = AggregationContextFixture; + +TEST_F(FindAndModifyImageLookupTest, NoopWhenEntryDoesNotHaveNeedsRetryImageField) { + auto imageLookup = DocumentSourceFindAndModifyImageLookup::create(getExpCtx()); + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(1); + const auto opTime = repl::OpTime(Timestamp(2, 1), 1); + const auto preImageOpTime = repl::OpTime(Timestamp(1, 1), 1); + const auto oplogEntryBson = makeOplogEntry(opTime, + repl::OpTypeEnum::kNoop, + NamespaceString("test.foo"), + BSON("a" << 1), + sessionInfo, + preImageOpTime) + .getEntry() + .toBSON(); + auto mock = DocumentSourceMock::createForTest(Document(oplogEntryBson), getExpCtx()); + imageLookup->setSource(mock.get()); + // Mock out the foreign collection. + getExpCtx()->mongoProcessInterface = + std::make_unique<MockMongoInterface>(std::vector<Document>{}); + + auto next = imageLookup->getNext(); + ASSERT_TRUE(next.isAdvanced()); + Document expected = Document(oplogEntryBson); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + ASSERT_TRUE(imageLookup->getNext().isEOF()); + ASSERT_TRUE(imageLookup->getNext().isEOF()); + ASSERT_TRUE(imageLookup->getNext().isEOF()); +} + +TEST_F(FindAndModifyImageLookupTest, ShouldNotForgeImageEntryWhenImageDocMissing) { + auto imageLookup = DocumentSourceFindAndModifyImageLookup::create(getExpCtx()); + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(1); + const auto opTime = repl::OpTime(Timestamp(2, 1), 1); + const auto oplogEntryBson = makeOplogEntry(opTime, + repl::OpTypeEnum::kNoop, + NamespaceString("test.foo"), + BSON("a" << 1), + sessionInfo, + boost::none /* preImageOpTime */, + boost::none /* postImageOpTime */, + repl::RetryImageEnum::kPreImage) + .getEntry() + .toBSON(); + auto mock = DocumentSourceMock::createForTest(Document(oplogEntryBson), getExpCtx()); + imageLookup->setSource(mock.get()); + + // Mock out the foreign collection. + getExpCtx()->mongoProcessInterface = + std::make_unique<MockMongoInterface>(std::vector<Document>{}); + + auto next = imageLookup->getNext(); + ASSERT_TRUE(next.isAdvanced()); + Document expected = Document(oplogEntryBson); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + ASSERT_TRUE(imageLookup->getNext().isEOF()); + ASSERT_TRUE(imageLookup->getNext().isEOF()); + ASSERT_TRUE(imageLookup->getNext().isEOF()); +} + +TEST_F(FindAndModifyImageLookupTest, ShouldNotForgeImageEntryWhenImageDocHasDifferentTxnNumber) { + auto imageLookup = DocumentSourceFindAndModifyImageLookup::create(getExpCtx()); + const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(1); + const auto ts = Timestamp(2, 1); + const auto opTime = repl::OpTime(ts, 1); + const auto oplogEntryBson = makeOplogEntry(opTime, + repl::OpTypeEnum::kNoop, + NamespaceString("test.foo"), + BSON("a" << 1), + sessionInfo, + boost::none /* preImageOpTime */, + boost::none /* postImageOpTime */, + repl::RetryImageEnum::kPreImage) + .getEntry() + .toBSON(); + auto mock = DocumentSourceMock::createForTest(Document(oplogEntryBson), getExpCtx()); + imageLookup->setSource(mock.get()); + + // Create an 'ImageEntry' with a higher 'txnNumber'. + const auto preImage = BSON("a" << 2); + repl::ImageEntry imageEntry; + imageEntry.set_id(sessionId); + imageEntry.setTxnNumber(2); + imageEntry.setTs(ts); + imageEntry.setImageKind(repl::RetryImageEnum::kPreImage); + imageEntry.setImage(preImage); + // Mock out the foreign collection. + getExpCtx()->mongoProcessInterface = + std::make_unique<MockMongoInterface>(std::vector<Document>{Document{imageEntry.toBSON()}}); + + auto next = imageLookup->getNext(); + ASSERT_TRUE(next.isAdvanced()); + Document expected = Document(oplogEntryBson); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expected); + + ASSERT_TRUE(imageLookup->getNext().isEOF()); + ASSERT_TRUE(imageLookup->getNext().isEOF()); + ASSERT_TRUE(imageLookup->getNext().isEOF()); +} + + +TEST_F(FindAndModifyImageLookupTest, ShouldForgeImageEntryWhenMatchingImageDocIsFound) { + std::vector<repl::RetryImageEnum> cases{repl::RetryImageEnum::kPreImage, + repl::RetryImageEnum::kPostImage}; + for (auto imageType : cases) { + LOGV2(5806002, + "ForgeImageEntryTestCase", + "imageType"_attr = repl::RetryImage_serializer(imageType)); + auto imageLookup = DocumentSourceFindAndModifyImageLookup::create(getExpCtx()); + const auto sessionId = makeLogicalSessionIdForTest(); + const auto txnNum = 1LL; + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(txnNum); + const auto ts = Timestamp(2, 1); + const auto opTime = repl::OpTime(ts, 1); + const auto oplogEntryBson = makeOplogEntry(opTime, + repl::OpTypeEnum::kUpdate, + NamespaceString("test.foo"), + BSON("a" << 1), + sessionInfo, + boost::none /* preImageOpTime */, + boost::none /* postImageOpTime */, + imageType) + .getEntry() + .toBSON(); + auto mock = DocumentSourceMock::createForTest(Document(oplogEntryBson), getExpCtx()); + imageLookup->setSource(mock.get()); + + const auto prePostImage = BSON("a" << 2); + repl::ImageEntry imageEntry; + imageEntry.set_id(sessionId); + imageEntry.setTxnNumber(txnNum); + imageEntry.setTs(ts); + imageEntry.setImageKind(imageType); + imageEntry.setImage(prePostImage); + // Mock out the foreign collection. + getExpCtx()->mongoProcessInterface = std::make_unique<MockMongoInterface>( + std::vector<Document>{Document{imageEntry.toBSON()}}); + + // The forged image oplog entry should be returned before the findAndModify oplog entry. + auto next = imageLookup->getNext(); + ASSERT_TRUE(next.isAdvanced()); + const auto forgedImageEntry = + repl::OplogEntry::parse(next.releaseDocument().toBson()).getValue(); + ASSERT_BSONOBJ_EQ(prePostImage, forgedImageEntry.getObject()); + ASSERT_EQUALS(txnNum, forgedImageEntry.getTxnNumber().get()); + ASSERT_EQUALS(sessionId, forgedImageEntry.getSessionId().get()); + ASSERT_EQUALS("n", repl::OpType_serializer(forgedImageEntry.getOpType())); + ASSERT_EQUALS(0LL, forgedImageEntry.getStatementIds().front()); + ASSERT_EQUALS(ts, forgedImageEntry.getTimestamp()); + ASSERT_EQUALS(1, forgedImageEntry.getTerm().get()); + + // The next doc should be the original findAndModify oplog entry with the 'needsRetryImage' + // field removed and 'preImageOpTime'/'postImageOpTime' field appended. + next = imageLookup->getNext(); + MutableDocument expectedDownConvertedDoc{Document{oplogEntryBson}}; + expectedDownConvertedDoc.remove(repl::OplogEntryBase::kNeedsRetryImageFieldName); + const auto expectedImageOpTimeFieldName = imageType == repl::RetryImageEnum::kPreImage + ? repl::OplogEntry::kPreImageOpTimeFieldName + : repl::OplogEntry::kPostImageOpTimeFieldName; + expectedDownConvertedDoc.setField( + expectedImageOpTimeFieldName, + Value{Document{{repl::OpTime::kTimestampFieldName.toString(), opTime.getTimestamp()}, + {repl::OpTime::kTermFieldName.toString(), opTime.getTerm()}}}); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedDownConvertedDoc.freeze()); + + ASSERT_TRUE(imageLookup->getNext().isEOF()); + ASSERT_TRUE(imageLookup->getNext().isEOF()); + ASSERT_TRUE(imageLookup->getNext().isEOF()); + } +} +} // namespace +} // namespace mongo |