diff options
author | Randolph Tan <randolph@10gen.com> | 2020-08-17 14:22:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-26 17:14:55 +0000 |
commit | a60b31413da02569d9e6e682da3ebf0ae6cc4107 (patch) | |
tree | 2a69e66e582e93df7c6cc11ae8d550b819786032 /src/mongo/db | |
parent | fff906108611c79956cd5bd7dc68aada59eff361 (diff) | |
download | mongo-a60b31413da02569d9e6e682da3ebf0ae6cc4107.tar.gz |
SERVER-50117 Add resumeToken and pre/post image oplog tests for createAggForReshardingOplogBuffer
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.idl | 7 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/donor_oplog_id.idl | 49 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util_test.cpp | 408 |
7 files changed, 458 insertions, 21 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d5a8d6a18f9..e8d98a08ecc 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -491,6 +491,7 @@ env.Library( '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/catalog/collection_options', + '$BUILD_DIR/mongo/db/exec/document_value/document_value', '$BUILD_DIR/mongo/idl/idl_parser', ], LIBDEPS_PRIVATE=[ diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 5147eb9dfe0..7a229b3dd07 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -34,6 +34,7 @@ global: imports: - "mongo/idl/basic_types.idl" - "mongo/db/logical_session_id.idl" + - "mongo/db/pipeline/value.idl" - "mongo/db/repl/optime_and_wall_time_base.idl" - "mongo/db/repl/replication_types.idl" - "mongo/s/sharding_types.idl" @@ -125,9 +126,11 @@ structs: description: "Contains the UUID of a tenant migration for an operation caused by one." _id: - type: objectid + cpp_name: _id + type: Value optional: true - description: "An optional _id field for tests that manually insert oplog entries" + description: "Used by tests in replication and also by resharding to store + timestamps." stmtId: cpp_name: statementId type: StmtId diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index fe4bc629199..c6464a5a569 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -91,6 +91,7 @@ env.Library( env.Idlc('range_deletion_task.idl')[0], env.Idlc('resharding/coordinator_document.idl')[0], env.Idlc('resharding/donor_document.idl')[0], + env.Idlc('resharding/donor_oplog_id.idl')[0], env.Idlc('resharding/recipient_document.idl')[0], ], LIBDEPS=[ diff --git a/src/mongo/db/s/resharding/donor_oplog_id.idl b/src/mongo/db/s/resharding/donor_oplog_id.idl new file mode 100644 index 00000000000..e6e21079d78 --- /dev/null +++ b/src/mongo/db/s/resharding/donor_oplog_id.idl @@ -0,0 +1,49 @@ +# Copyright (C) 2020-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This file defines the _id format of oplog entry stored in config.localReshardingOplogBuffer.*. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + ReshardingDonorOplogId: + description: >- + Represents the set of timestamps that belong to an operation from the donor shard. + fields: + clusterTime: + type: timestamp + description: >- + The clusterTime of the final oplog entry of a transaction. If the operation is + not from a transaction, then it is the same value as the ts field. + ts: + type: timestamp + description: "The clusterTime of this operation." diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 4cdb09b310a..5d81a724ce1 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -114,12 +114,13 @@ void validateZones(const std::vector<mongo::BSONObj>& zones, } std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer( - const boost::intrusive_ptr<ExpressionContext>& expCtx, const BSONObj& resumeToken) { + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::optional<ReshardingDonorOplogId>& resumeToken) { std::list<boost::intrusive_ptr<DocumentSource>> stages; - if (!resumeToken.isEmpty()) { - stages.emplace_back( - DocumentSourceMatch::create(BSON("_id" << BSON("$gt" << resumeToken)), expCtx)); + if (resumeToken) { + stages.emplace_back(DocumentSourceMatch::create( + BSON("_id" << BSON("$gt" << resumeToken->toBSON())), expCtx)); } stages.emplace_back(DocumentSourceSort::create(expCtx, BSON("_id" << 1))); diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index dbb1c659b30..b6441cd16fe 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -34,6 +34,7 @@ #include "mongo/db/keypattern.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/s/resharding/donor_oplog_id_gen.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/resharded_chunk_gen.h" @@ -75,6 +76,7 @@ void validateZones(const std::vector<mongo::BSONObj>& zones, * sure that the donorOplogNS is properly resolved and ns is set in the expCtx. */ std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer( - const boost::intrusive_ptr<ExpressionContext>& expCtx, const BSONObj& resumeToken); + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const boost::optional<ReshardingDonorOplogId>& resumeToken); } // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp index bd9e4683d22..9c8a01198d3 100644 --- a/src/mongo/db/s/resharding_util_test.cpp +++ b/src/mongo/db/s/resharding_util_test.cpp @@ -31,6 +31,8 @@ #include "mongo/platform/basic.h" +#include <vector> + #include "mongo/bson/bsonobj.h" #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/document_source_mock.h" @@ -40,7 +42,6 @@ #include "mongo/db/s/resharding_util.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/unittest/unittest.h" -#include <vector> namespace mongo { namespace { @@ -66,6 +67,35 @@ private: std::deque<DocumentSource::GetNextResult> _mockResults; }; +repl::MutableOplogEntry makeOplog(const NamespaceString& nss, + const UUID& uuid, + const repl::OpTypeEnum& opType, + const BSONObj& oField, + const BSONObj& o2Field, + const boost::optional<ReshardingDonorOplogId>& _id) { + repl::MutableOplogEntry oplogEntry; + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); + oplogEntry.setOpType(opType); + oplogEntry.setObject(oField); + + if (!o2Field.isEmpty()) { + oplogEntry.setObject2(o2Field); + } + + oplogEntry.setOpTimeAndWallTimeBase(repl::OpTimeAndWallTimeBase({}, {})); + oplogEntry.set_id(Value(_id->toBSON())); + + return oplogEntry; +} + +repl::MutableOplogEntry makePrePostImageOplog(const NamespaceString& nss, + const UUID& uuid, + const ReshardingDonorOplogId& _id, + const BSONObj& prePostImage) { + return makeOplog(nss, uuid, repl::OpTypeEnum::kNoop, prePostImage, {}, _id); +} + class ReshardingUtilTest : public ConfigServerTestFixture { protected: void setUp() override { @@ -259,17 +289,100 @@ protected: return expCtx; } + /************************************************************************************ + * These set of helper function generate pre-made oplogs with the following timestamps: + * + * deletePreImage: ts(7, 35) + * updatePostImage: ts(10, 15) + * insert: ts(25, 345) + * update: ts(30, 16) + * delete: ts(66, 86) + */ + + repl::MutableOplogEntry makeInsertOplog() { + const Timestamp insertTs(25, 345); + const ReshardingDonorOplogId insertId(insertTs, insertTs); + return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kInsert, BSON("x" << 1), {}, insertId); + } + + repl::MutableOplogEntry makeUpdateOplog() { + const Timestamp updateWithPostOplogTs(30, 16); + const ReshardingDonorOplogId updateWithPostOplogId(updateWithPostOplogTs, + updateWithPostOplogTs); + return makeOplog(_crudNss, + _uuid, + repl::OpTypeEnum::kUpdate, + BSON("$set" << BSON("y" << 1)), + BSON("post" << 1), + updateWithPostOplogId); + } + + repl::MutableOplogEntry makeDeleteOplog() { + const Timestamp deleteWithPreOplogTs(66, 86); + const ReshardingDonorOplogId deleteWithPreOplogId(deleteWithPreOplogTs, + deleteWithPreOplogTs); + return makeOplog( + _crudNss, _uuid, repl::OpTypeEnum::kDelete, BSON("pre" << 1), {}, deleteWithPreOplogId); + } + + /** + * Returns (postImageOplog, updateOplog) pair. + */ + std::pair<repl::MutableOplogEntry, repl::MutableOplogEntry> makeUpdateWithPostImage() { + const Timestamp postImageTs(10, 5); + const ReshardingDonorOplogId postImageId(postImageTs, postImageTs); + auto postImageOplog = + makePrePostImageOplog(_crudNss, _uuid, postImageId, BSON("post" << 1 << "y" << 4)); + + auto updateWithPostOplog = makeUpdateOplog(); + updateWithPostOplog.setPostImageOpTime(repl::OpTime(postImageTs, _term)); + return std::make_pair(postImageOplog, updateWithPostOplog); + } + + /** + * Returns (preImageOplog, deleteOplog) pair. + */ + std::pair<repl::MutableOplogEntry, repl::MutableOplogEntry> makeDeleteWithPreImage() { + const Timestamp preImageTs(7, 35); + const ReshardingDonorOplogId preImageId(preImageTs, preImageTs); + auto preImageOplog = + makePrePostImageOplog(_crudNss, _uuid, preImageId, BSON("pre" << 1 << "z" << 4)); + + auto deleteWithPreOplog = makeDeleteOplog(); + deleteWithPreOplog.setPreImageOpTime(repl::OpTime(preImageTs, _term)); + + return std::make_pair(preImageOplog, deleteWithPreOplog); + } + + ReshardingDonorOplogId getOplogId(const repl::MutableOplogEntry& oplog) { + return ReshardingDonorOplogId::parse(IDLParserErrorContext("ReshardingAggTest::getOplogId"), + oplog.get_id()->getDocument().toBson()); + } + + BSONObj addExpectedFields(const repl::MutableOplogEntry& oplog, + const boost::optional<repl::MutableOplogEntry>& chainedEntry) { + BSONObjBuilder builder(oplog.toBSON()); + + BSONArrayBuilder arrayBuilder(builder.subarrayStart(kReshardingOplogPrePostImageOps)); + if (chainedEntry) { + arrayBuilder.append(chainedEntry->toBSON()); + } + arrayBuilder.done(); + + return builder.obj(); + } + private: const NamespaceString _oplogNss{"config.localReshardingOplogBuffer.xxx.yyy"}; + const NamespaceString _crudNss{"test.foo"}; + const UUID _uuid{UUID::gen()}; + const int _term{20}; }; TEST_F(ReshardingAggTest, OplogPipelineBasicCRUDOnly) { - const NamespaceString crudNss("test.foo"); - const UUID uuid(UUID::gen()); - auto insertOplog = repl::MutableOplogEntry::makeInsertOperation(crudNss, uuid, BSON("x" << 1)); - auto updateOplog = repl::MutableOplogEntry::makeUpdateOperation( - crudNss, uuid, BSON("x" << 1), BSON("$set" << BSON("y" << 1))); - auto deleteOplog = repl::MutableOplogEntry::makeDeleteOperation(crudNss, uuid, BSON("x" << 1)); + auto insertOplog = makeInsertOplog(); + auto updateOplog = makeUpdateOplog(); + auto deleteOplog = makeDeleteOplog(); std::deque<DocumentSource::GetNextResult> mockResults; mockResults.emplace_back(Document(insertOplog.toBSON())); @@ -281,24 +394,291 @@ TEST_F(ReshardingAggTest, OplogPipelineBasicCRUDOnly) { expCtx->ns = reshardingOplogNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, BSONObj()); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + + // Mock non-lookup collection document source. + auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); + pipeline->addInitialSource(mockSource); + + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteOplog, boost::none), next->toBson()); + + ASSERT(!pipeline->getNext()); +} + +/** + * Test with 3 oplog: insert -> update -> delete, then resume from point after insert. + */ +TEST_F(ReshardingAggTest, OplogPipelineWithResumeToken) { + auto insertOplog = makeInsertOplog(); + auto updateOplog = makeUpdateOplog(); + auto deleteOplog = makeDeleteOplog(); + + std::deque<DocumentSource::GetNextResult> mockResults; + mockResults.emplace_back(Document(insertOplog.toBSON())); + mockResults.emplace_back(Document(updateOplog.toBSON())); + mockResults.emplace_back(Document(deleteOplog.toBSON())); + + // Mock lookup collection document souce. + auto expCtx = createExpressionContext(); + expCtx->ns = reshardingOplogNss(); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); + + auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog)); + + // Mock non-lookup collection document source. + auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); + pipeline->addInitialSource(mockSource); + + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteOplog, boost::none), next->toBson()); + + ASSERT(!pipeline->getNext()); +} + +/** + * Test with 3 oplog: insert -> update -> delete, then resume from point after insert. + */ +TEST_F(ReshardingAggTest, OplogPipelineWithResumeTokenClusterTimeNotEqualTs) { + auto modifyClusterTsTo = [&](repl::MutableOplogEntry& oplog, const Timestamp& ts) { + auto newId = getOplogId(oplog); + newId.setClusterTime(ts); + oplog.set_id(Value(newId.toBSON())); + }; + + auto insertOplog = makeInsertOplog(); + modifyClusterTsTo(insertOplog, Timestamp(33, 46)); + auto updateOplog = makeUpdateOplog(); + modifyClusterTsTo(updateOplog, Timestamp(44, 55)); + auto deleteOplog = makeDeleteOplog(); + modifyClusterTsTo(deleteOplog, Timestamp(79, 80)); + + std::deque<DocumentSource::GetNextResult> mockResults; + mockResults.emplace_back(Document(insertOplog.toBSON())); + mockResults.emplace_back(Document(updateOplog.toBSON())); + mockResults.emplace_back(Document(deleteOplog.toBSON())); + + // Mock lookup collection document souce. + auto expCtx = createExpressionContext(); + expCtx->ns = reshardingOplogNss(); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); + + auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog)); + + // Mock non-lookup collection document source. + auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); + pipeline->addInitialSource(mockSource); + + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteOplog, boost::none), next->toBson()); + + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingAggTest, OplogPipelineWithPostImage) { + auto insertOplog = makeInsertOplog(); + + repl::MutableOplogEntry postImageOplog, updateWithPostOplog; + std::tie(postImageOplog, updateWithPostOplog) = makeUpdateWithPostImage(); + + std::deque<DocumentSource::GetNextResult> mockResults; + mockResults.emplace_back(Document(insertOplog.toBSON())); + mockResults.emplace_back(Document(postImageOplog.toBSON())); + mockResults.emplace_back(Document(updateWithPostOplog.toBSON())); + + // Mock lookup collection document souce. + auto expCtx = createExpressionContext(); + expCtx->ns = reshardingOplogNss(); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); + + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + + // Mock non-lookup collection document source. + auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); + pipeline->addInitialSource(mockSource); + + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(postImageOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateWithPostOplog, postImageOplog), + next->toBson()); + + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingAggTest, OplogPipelineWithLargeBSONPostImage) { + auto insertOplog = makeInsertOplog(); + + repl::MutableOplogEntry postImageOplog, updateWithPostOplog; + std::tie(postImageOplog, updateWithPostOplog) = makeUpdateWithPostImage(); + + // Modify default fixture docs with large BSON documents. + const std::string::size_type bigSize = 12 * 1024 * 1024; + std::string bigStr(bigSize, 'x'); + postImageOplog.setObject(BSON("bigVal" << bigStr)); + updateWithPostOplog.setObject2(BSON("bigVal" << bigStr)); + + std::deque<DocumentSource::GetNextResult> mockResults; + mockResults.emplace_back(Document(insertOplog.toBSON())); + mockResults.emplace_back(Document(postImageOplog.toBSON())); + mockResults.emplace_back(Document(updateWithPostOplog.toBSON())); + + // Mock lookup collection document souce. + auto expCtx = createExpressionContext(); + expCtx->ns = reshardingOplogNss(); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); + + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + + // Mock non-lookup collection document source. + auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); + pipeline->addInitialSource(mockSource); + + // Check only _id because attempting to call toBson will trigger BSON too large assertion. + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(postImageOplog.get_id()->getDocument().toBson(), + next->getField("_id").getDocument().toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(insertOplog.get_id()->getDocument().toBson(), + next->getField("_id").getDocument().toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(updateWithPostOplog.get_id()->getDocument().toBson(), + next->getField("_id").getDocument().toBson()); + + ASSERT(!pipeline->getNext()); +} + +/** + * Test with 3 oplog: postImage -> insert -> update, then resume from point after postImage. + */ +TEST_F(ReshardingAggTest, OplogPipelineResumeAfterPostImage) { + auto insertOplog = makeInsertOplog(); + + repl::MutableOplogEntry postImageOplog, updateWithPostOplog; + std::tie(postImageOplog, updateWithPostOplog) = makeUpdateWithPostImage(); + + std::deque<DocumentSource::GetNextResult> mockResults; + mockResults.emplace_back(Document(insertOplog.toBSON())); + mockResults.emplace_back(Document(postImageOplog.toBSON())); + mockResults.emplace_back(Document(updateWithPostOplog.toBSON())); + + // Mock lookup collection document souce. + auto expCtx = createExpressionContext(); + expCtx->ns = reshardingOplogNss(); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); + + auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(postImageOplog)); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); pipeline->addInitialSource(mockSource); auto next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson()); - ASSERT_EQ("i", next->getField("op").getStringData()); - ASSERT_EQ(0, next->getField(kReshardingOplogPrePostImageOps).getArrayLength()); + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateWithPostOplog, postImageOplog), + next->toBson()); + + ASSERT(!pipeline->getNext()); +} + +TEST_F(ReshardingAggTest, OplogPipelineWithPreImage) { + auto insertOplog = makeInsertOplog(); + + repl::MutableOplogEntry preImageOplog, deleteWithPreOplog; + std::tie(preImageOplog, deleteWithPreOplog) = makeDeleteWithPreImage(); + + std::deque<DocumentSource::GetNextResult> mockResults; + mockResults.emplace_back(Document(insertOplog.toBSON())); + mockResults.emplace_back(Document(preImageOplog.toBSON())); + mockResults.emplace_back(Document(deleteWithPreOplog.toBSON())); + + // Mock lookup collection document souce. + auto expCtx = createExpressionContext(); + expCtx->ns = reshardingOplogNss(); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); + + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + + // Mock non-lookup collection document source. + auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); + pipeline->addInitialSource(mockSource); + + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(preImageOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteWithPreOplog, preImageOplog), next->toBson()); + + ASSERT(!pipeline->getNext()); +} + +/** + * Oplog _id order in this test is: + * delPreImage -> updatePostImage -> unrelatedInsert -> update -> delete + */ +TEST_F(ReshardingAggTest, OplogPipelineWithPreAndPostImage) { + auto insertOplog = makeInsertOplog(); + + repl::MutableOplogEntry postImageOplog, updateWithPostOplog, preImageOplog, deleteWithPreOplog; + std::tie(postImageOplog, updateWithPostOplog) = makeUpdateWithPostImage(); + std::tie(preImageOplog, deleteWithPreOplog) = makeDeleteWithPreImage(); + + std::deque<DocumentSource::GetNextResult> mockResults; + mockResults.emplace_back(Document(insertOplog.toBSON())); + mockResults.emplace_back(Document(postImageOplog.toBSON())); + mockResults.emplace_back(Document(updateWithPostOplog.toBSON())); + mockResults.emplace_back(Document(preImageOplog.toBSON())); + mockResults.emplace_back(Document(deleteWithPreOplog.toBSON())); + + // Mock lookup collection document souce. + auto expCtx = createExpressionContext(); + expCtx->ns = reshardingOplogNss(); + expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); + + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + + // Mock non-lookup collection document source. + auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); + pipeline->addInitialSource(mockSource); + + auto next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(preImageOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(postImageOplog, boost::none), next->toBson()); + + next = pipeline->getNext(); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson()); next = pipeline->getNext(); - ASSERT_EQ("u", next->getField("op").getStringData()); - ASSERT_EQ(0, next->getField(kReshardingOplogPrePostImageOps).getArrayLength()); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateWithPostOplog, postImageOplog), + next->toBson()); next = pipeline->getNext(); - ASSERT_EQ("d", next->getField("op").getStringData()); - ASSERT_EQ(0, next->getField(kReshardingOplogPrePostImageOps).getArrayLength()); + ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteWithPreOplog, preImageOplog), next->toBson()); ASSERT(!pipeline->getNext()); } |