summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2020-08-17 14:22:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-26 17:14:55 +0000
commita60b31413da02569d9e6e682da3ebf0ae6cc4107 (patch)
tree2a69e66e582e93df7c6cc11ae8d550b819786032 /src/mongo/db
parentfff906108611c79956cd5bd7dc68aada59eff361 (diff)
downloadmongo-a60b31413da02569d9e6e682da3ebf0ae6cc4107.tar.gz
SERVER-50117 Add resumeToken and pre/post image oplog tests for createAggForReshardingOplogBuffer
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_entry.idl7
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/resharding/donor_oplog_id.idl49
-rw-r--r--src/mongo/db/s/resharding_util.cpp9
-rw-r--r--src/mongo/db/s/resharding_util.h4
-rw-r--r--src/mongo/db/s/resharding_util_test.cpp408
7 files changed, 458 insertions, 21 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d5a8d6a18f9..e8d98a08ecc 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -491,6 +491,7 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/logical_session_id',
'$BUILD_DIR/mongo/db/catalog/collection_options',
+ '$BUILD_DIR/mongo/db/exec/document_value/document_value',
'$BUILD_DIR/mongo/idl/idl_parser',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 5147eb9dfe0..7a229b3dd07 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -34,6 +34,7 @@ global:
imports:
- "mongo/idl/basic_types.idl"
- "mongo/db/logical_session_id.idl"
+ - "mongo/db/pipeline/value.idl"
- "mongo/db/repl/optime_and_wall_time_base.idl"
- "mongo/db/repl/replication_types.idl"
- "mongo/s/sharding_types.idl"
@@ -125,9 +126,11 @@ structs:
description: "Contains the UUID of a tenant migration for an operation caused by
one."
_id:
- type: objectid
+ cpp_name: _id
+ type: Value
optional: true
- description: "An optional _id field for tests that manually insert oplog entries"
+ description: "Used by tests in replication and also by resharding to store
+ timestamps."
stmtId:
cpp_name: statementId
type: StmtId
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index fe4bc629199..c6464a5a569 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -91,6 +91,7 @@ env.Library(
env.Idlc('range_deletion_task.idl')[0],
env.Idlc('resharding/coordinator_document.idl')[0],
env.Idlc('resharding/donor_document.idl')[0],
+ env.Idlc('resharding/donor_oplog_id.idl')[0],
env.Idlc('resharding/recipient_document.idl')[0],
],
LIBDEPS=[
diff --git a/src/mongo/db/s/resharding/donor_oplog_id.idl b/src/mongo/db/s/resharding/donor_oplog_id.idl
new file mode 100644
index 00000000000..e6e21079d78
--- /dev/null
+++ b/src/mongo/db/s/resharding/donor_oplog_id.idl
@@ -0,0 +1,49 @@
+# Copyright (C) 2020-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This file defines the _id format of oplog entry stored in config.localReshardingOplogBuffer.*.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ ReshardingDonorOplogId:
+ description: >-
+ Represents the set of timestamps that belong to an operation from the donor shard.
+ fields:
+ clusterTime:
+ type: timestamp
+ description: >-
+ The clusterTime of the final oplog entry of a transaction. If the operation is
+ not from a transaction, then it is the same value as the ts field.
+ ts:
+ type: timestamp
+ description: "The clusterTime of this operation."
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 4cdb09b310a..5d81a724ce1 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -114,12 +114,13 @@ void validateZones(const std::vector<mongo::BSONObj>& zones,
}
std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, const BSONObj& resumeToken) {
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::optional<ReshardingDonorOplogId>& resumeToken) {
std::list<boost::intrusive_ptr<DocumentSource>> stages;
- if (!resumeToken.isEmpty()) {
- stages.emplace_back(
- DocumentSourceMatch::create(BSON("_id" << BSON("$gt" << resumeToken)), expCtx));
+ if (resumeToken) {
+ stages.emplace_back(DocumentSourceMatch::create(
+ BSON("_id" << BSON("$gt" << resumeToken->toBSON())), expCtx));
}
stages.emplace_back(DocumentSourceSort::create(expCtx, BSON("_id" << 1)));
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index dbb1c659b30..b6441cd16fe 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -34,6 +34,7 @@
#include "mongo/db/keypattern.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/resharded_chunk_gen.h"
@@ -75,6 +76,7 @@ void validateZones(const std::vector<mongo::BSONObj>& zones,
* sure that the donorOplogNS is properly resolved and ns is set in the expCtx.
*/
std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, const BSONObj& resumeToken);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const boost::optional<ReshardingDonorOplogId>& resumeToken);
} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp
index bd9e4683d22..9c8a01198d3 100644
--- a/src/mongo/db/s/resharding_util_test.cpp
+++ b/src/mongo/db/s/resharding_util_test.cpp
@@ -31,6 +31,8 @@
#include "mongo/platform/basic.h"
+#include <vector>
+
#include "mongo/bson/bsonobj.h"
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/document_source_mock.h"
@@ -40,7 +42,6 @@
#include "mongo/db/s/resharding_util.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/unittest/unittest.h"
-#include <vector>
namespace mongo {
namespace {
@@ -66,6 +67,35 @@ private:
std::deque<DocumentSource::GetNextResult> _mockResults;
};
+repl::MutableOplogEntry makeOplog(const NamespaceString& nss,
+ const UUID& uuid,
+ const repl::OpTypeEnum& opType,
+ const BSONObj& oField,
+ const BSONObj& o2Field,
+ const boost::optional<ReshardingDonorOplogId>& _id) {
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setOpType(opType);
+ oplogEntry.setObject(oField);
+
+ if (!o2Field.isEmpty()) {
+ oplogEntry.setObject2(o2Field);
+ }
+
+ oplogEntry.setOpTimeAndWallTimeBase(repl::OpTimeAndWallTimeBase({}, {}));
+ oplogEntry.set_id(Value(_id->toBSON()));
+
+ return oplogEntry;
+}
+
+repl::MutableOplogEntry makePrePostImageOplog(const NamespaceString& nss,
+ const UUID& uuid,
+ const ReshardingDonorOplogId& _id,
+ const BSONObj& prePostImage) {
+ return makeOplog(nss, uuid, repl::OpTypeEnum::kNoop, prePostImage, {}, _id);
+}
+
class ReshardingUtilTest : public ConfigServerTestFixture {
protected:
void setUp() override {
@@ -259,17 +289,100 @@ protected:
return expCtx;
}
+ /************************************************************************************
+ * These set of helper function generate pre-made oplogs with the following timestamps:
+ *
+ * deletePreImage: ts(7, 35)
+ * updatePostImage: ts(10, 15)
+ * insert: ts(25, 345)
+ * update: ts(30, 16)
+ * delete: ts(66, 86)
+ */
+
+ repl::MutableOplogEntry makeInsertOplog() {
+ const Timestamp insertTs(25, 345);
+ const ReshardingDonorOplogId insertId(insertTs, insertTs);
+ return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kInsert, BSON("x" << 1), {}, insertId);
+ }
+
+ repl::MutableOplogEntry makeUpdateOplog() {
+ const Timestamp updateWithPostOplogTs(30, 16);
+ const ReshardingDonorOplogId updateWithPostOplogId(updateWithPostOplogTs,
+ updateWithPostOplogTs);
+ return makeOplog(_crudNss,
+ _uuid,
+ repl::OpTypeEnum::kUpdate,
+ BSON("$set" << BSON("y" << 1)),
+ BSON("post" << 1),
+ updateWithPostOplogId);
+ }
+
+ repl::MutableOplogEntry makeDeleteOplog() {
+ const Timestamp deleteWithPreOplogTs(66, 86);
+ const ReshardingDonorOplogId deleteWithPreOplogId(deleteWithPreOplogTs,
+ deleteWithPreOplogTs);
+ return makeOplog(
+ _crudNss, _uuid, repl::OpTypeEnum::kDelete, BSON("pre" << 1), {}, deleteWithPreOplogId);
+ }
+
+ /**
+ * Returns (postImageOplog, updateOplog) pair.
+ */
+ std::pair<repl::MutableOplogEntry, repl::MutableOplogEntry> makeUpdateWithPostImage() {
+ const Timestamp postImageTs(10, 5);
+ const ReshardingDonorOplogId postImageId(postImageTs, postImageTs);
+ auto postImageOplog =
+ makePrePostImageOplog(_crudNss, _uuid, postImageId, BSON("post" << 1 << "y" << 4));
+
+ auto updateWithPostOplog = makeUpdateOplog();
+ updateWithPostOplog.setPostImageOpTime(repl::OpTime(postImageTs, _term));
+ return std::make_pair(postImageOplog, updateWithPostOplog);
+ }
+
+ /**
+ * Returns (preImageOplog, deleteOplog) pair.
+ */
+ std::pair<repl::MutableOplogEntry, repl::MutableOplogEntry> makeDeleteWithPreImage() {
+ const Timestamp preImageTs(7, 35);
+ const ReshardingDonorOplogId preImageId(preImageTs, preImageTs);
+ auto preImageOplog =
+ makePrePostImageOplog(_crudNss, _uuid, preImageId, BSON("pre" << 1 << "z" << 4));
+
+ auto deleteWithPreOplog = makeDeleteOplog();
+ deleteWithPreOplog.setPreImageOpTime(repl::OpTime(preImageTs, _term));
+
+ return std::make_pair(preImageOplog, deleteWithPreOplog);
+ }
+
+ ReshardingDonorOplogId getOplogId(const repl::MutableOplogEntry& oplog) {
+ return ReshardingDonorOplogId::parse(IDLParserErrorContext("ReshardingAggTest::getOplogId"),
+ oplog.get_id()->getDocument().toBson());
+ }
+
+ BSONObj addExpectedFields(const repl::MutableOplogEntry& oplog,
+ const boost::optional<repl::MutableOplogEntry>& chainedEntry) {
+ BSONObjBuilder builder(oplog.toBSON());
+
+ BSONArrayBuilder arrayBuilder(builder.subarrayStart(kReshardingOplogPrePostImageOps));
+ if (chainedEntry) {
+ arrayBuilder.append(chainedEntry->toBSON());
+ }
+ arrayBuilder.done();
+
+ return builder.obj();
+ }
+
private:
const NamespaceString _oplogNss{"config.localReshardingOplogBuffer.xxx.yyy"};
+ const NamespaceString _crudNss{"test.foo"};
+ const UUID _uuid{UUID::gen()};
+ const int _term{20};
};
TEST_F(ReshardingAggTest, OplogPipelineBasicCRUDOnly) {
- const NamespaceString crudNss("test.foo");
- const UUID uuid(UUID::gen());
- auto insertOplog = repl::MutableOplogEntry::makeInsertOperation(crudNss, uuid, BSON("x" << 1));
- auto updateOplog = repl::MutableOplogEntry::makeUpdateOperation(
- crudNss, uuid, BSON("x" << 1), BSON("$set" << BSON("y" << 1)));
- auto deleteOplog = repl::MutableOplogEntry::makeDeleteOperation(crudNss, uuid, BSON("x" << 1));
+ auto insertOplog = makeInsertOplog();
+ auto updateOplog = makeUpdateOplog();
+ auto deleteOplog = makeDeleteOplog();
std::deque<DocumentSource::GetNextResult> mockResults;
mockResults.emplace_back(Document(insertOplog.toBSON()));
@@ -281,24 +394,291 @@ TEST_F(ReshardingAggTest, OplogPipelineBasicCRUDOnly) {
expCtx->ns = reshardingOplogNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, BSONObj());
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+
+ // Mock non-lookup collection document source.
+ auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
+ pipeline->addInitialSource(mockSource);
+
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteOplog, boost::none), next->toBson());
+
+ ASSERT(!pipeline->getNext());
+}
+
+/**
+ * Test with 3 oplog: insert -> update -> delete, then resume from point after insert.
+ */
+TEST_F(ReshardingAggTest, OplogPipelineWithResumeToken) {
+ auto insertOplog = makeInsertOplog();
+ auto updateOplog = makeUpdateOplog();
+ auto deleteOplog = makeDeleteOplog();
+
+ std::deque<DocumentSource::GetNextResult> mockResults;
+ mockResults.emplace_back(Document(insertOplog.toBSON()));
+ mockResults.emplace_back(Document(updateOplog.toBSON()));
+ mockResults.emplace_back(Document(deleteOplog.toBSON()));
+
+ // Mock lookup collection document souce.
+ auto expCtx = createExpressionContext();
+ expCtx->ns = reshardingOplogNss();
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
+
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog));
+
+ // Mock non-lookup collection document source.
+ auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
+ pipeline->addInitialSource(mockSource);
+
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteOplog, boost::none), next->toBson());
+
+ ASSERT(!pipeline->getNext());
+}
+
+/**
+ * Test with 3 oplog: insert -> update -> delete, then resume from point after insert.
+ */
+TEST_F(ReshardingAggTest, OplogPipelineWithResumeTokenClusterTimeNotEqualTs) {
+ auto modifyClusterTsTo = [&](repl::MutableOplogEntry& oplog, const Timestamp& ts) {
+ auto newId = getOplogId(oplog);
+ newId.setClusterTime(ts);
+ oplog.set_id(Value(newId.toBSON()));
+ };
+
+ auto insertOplog = makeInsertOplog();
+ modifyClusterTsTo(insertOplog, Timestamp(33, 46));
+ auto updateOplog = makeUpdateOplog();
+ modifyClusterTsTo(updateOplog, Timestamp(44, 55));
+ auto deleteOplog = makeDeleteOplog();
+ modifyClusterTsTo(deleteOplog, Timestamp(79, 80));
+
+ std::deque<DocumentSource::GetNextResult> mockResults;
+ mockResults.emplace_back(Document(insertOplog.toBSON()));
+ mockResults.emplace_back(Document(updateOplog.toBSON()));
+ mockResults.emplace_back(Document(deleteOplog.toBSON()));
+
+ // Mock lookup collection document souce.
+ auto expCtx = createExpressionContext();
+ expCtx->ns = reshardingOplogNss();
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
+
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog));
+
+ // Mock non-lookup collection document source.
+ auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
+ pipeline->addInitialSource(mockSource);
+
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteOplog, boost::none), next->toBson());
+
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingAggTest, OplogPipelineWithPostImage) {
+ auto insertOplog = makeInsertOplog();
+
+ repl::MutableOplogEntry postImageOplog, updateWithPostOplog;
+ std::tie(postImageOplog, updateWithPostOplog) = makeUpdateWithPostImage();
+
+ std::deque<DocumentSource::GetNextResult> mockResults;
+ mockResults.emplace_back(Document(insertOplog.toBSON()));
+ mockResults.emplace_back(Document(postImageOplog.toBSON()));
+ mockResults.emplace_back(Document(updateWithPostOplog.toBSON()));
+
+ // Mock lookup collection document souce.
+ auto expCtx = createExpressionContext();
+ expCtx->ns = reshardingOplogNss();
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
+
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+
+ // Mock non-lookup collection document source.
+ auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
+ pipeline->addInitialSource(mockSource);
+
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(postImageOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateWithPostOplog, postImageOplog),
+ next->toBson());
+
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingAggTest, OplogPipelineWithLargeBSONPostImage) {
+ auto insertOplog = makeInsertOplog();
+
+ repl::MutableOplogEntry postImageOplog, updateWithPostOplog;
+ std::tie(postImageOplog, updateWithPostOplog) = makeUpdateWithPostImage();
+
+ // Modify default fixture docs with large BSON documents.
+ const std::string::size_type bigSize = 12 * 1024 * 1024;
+ std::string bigStr(bigSize, 'x');
+ postImageOplog.setObject(BSON("bigVal" << bigStr));
+ updateWithPostOplog.setObject2(BSON("bigVal" << bigStr));
+
+ std::deque<DocumentSource::GetNextResult> mockResults;
+ mockResults.emplace_back(Document(insertOplog.toBSON()));
+ mockResults.emplace_back(Document(postImageOplog.toBSON()));
+ mockResults.emplace_back(Document(updateWithPostOplog.toBSON()));
+
+ // Mock lookup collection document souce.
+ auto expCtx = createExpressionContext();
+ expCtx->ns = reshardingOplogNss();
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
+
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+
+ // Mock non-lookup collection document source.
+ auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
+ pipeline->addInitialSource(mockSource);
+
+ // Check only _id because attempting to call toBson will trigger BSON too large assertion.
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(postImageOplog.get_id()->getDocument().toBson(),
+ next->getField("_id").getDocument().toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(insertOplog.get_id()->getDocument().toBson(),
+ next->getField("_id").getDocument().toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(updateWithPostOplog.get_id()->getDocument().toBson(),
+ next->getField("_id").getDocument().toBson());
+
+ ASSERT(!pipeline->getNext());
+}
+
+/**
+ * Test with 3 oplog: postImage -> insert -> update, then resume from point after postImage.
+ */
+TEST_F(ReshardingAggTest, OplogPipelineResumeAfterPostImage) {
+ auto insertOplog = makeInsertOplog();
+
+ repl::MutableOplogEntry postImageOplog, updateWithPostOplog;
+ std::tie(postImageOplog, updateWithPostOplog) = makeUpdateWithPostImage();
+
+ std::deque<DocumentSource::GetNextResult> mockResults;
+ mockResults.emplace_back(Document(insertOplog.toBSON()));
+ mockResults.emplace_back(Document(postImageOplog.toBSON()));
+ mockResults.emplace_back(Document(updateWithPostOplog.toBSON()));
+
+ // Mock lookup collection document souce.
+ auto expCtx = createExpressionContext();
+ expCtx->ns = reshardingOplogNss();
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
+
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(postImageOplog));
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
pipeline->addInitialSource(mockSource);
auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson());
- ASSERT_EQ("i", next->getField("op").getStringData());
- ASSERT_EQ(0, next->getField(kReshardingOplogPrePostImageOps).getArrayLength());
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateWithPostOplog, postImageOplog),
+ next->toBson());
+
+ ASSERT(!pipeline->getNext());
+}
+
+TEST_F(ReshardingAggTest, OplogPipelineWithPreImage) {
+ auto insertOplog = makeInsertOplog();
+
+ repl::MutableOplogEntry preImageOplog, deleteWithPreOplog;
+ std::tie(preImageOplog, deleteWithPreOplog) = makeDeleteWithPreImage();
+
+ std::deque<DocumentSource::GetNextResult> mockResults;
+ mockResults.emplace_back(Document(insertOplog.toBSON()));
+ mockResults.emplace_back(Document(preImageOplog.toBSON()));
+ mockResults.emplace_back(Document(deleteWithPreOplog.toBSON()));
+
+ // Mock lookup collection document souce.
+ auto expCtx = createExpressionContext();
+ expCtx->ns = reshardingOplogNss();
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
+
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+
+ // Mock non-lookup collection document source.
+ auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
+ pipeline->addInitialSource(mockSource);
+
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(preImageOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteWithPreOplog, preImageOplog), next->toBson());
+
+ ASSERT(!pipeline->getNext());
+}
+
+/**
+ * Oplog _id order in this test is:
+ * delPreImage -> updatePostImage -> unrelatedInsert -> update -> delete
+ */
+TEST_F(ReshardingAggTest, OplogPipelineWithPreAndPostImage) {
+ auto insertOplog = makeInsertOplog();
+
+ repl::MutableOplogEntry postImageOplog, updateWithPostOplog, preImageOplog, deleteWithPreOplog;
+ std::tie(postImageOplog, updateWithPostOplog) = makeUpdateWithPostImage();
+ std::tie(preImageOplog, deleteWithPreOplog) = makeDeleteWithPreImage();
+
+ std::deque<DocumentSource::GetNextResult> mockResults;
+ mockResults.emplace_back(Document(insertOplog.toBSON()));
+ mockResults.emplace_back(Document(postImageOplog.toBSON()));
+ mockResults.emplace_back(Document(updateWithPostOplog.toBSON()));
+ mockResults.emplace_back(Document(preImageOplog.toBSON()));
+ mockResults.emplace_back(Document(deleteWithPreOplog.toBSON()));
+
+ // Mock lookup collection document souce.
+ auto expCtx = createExpressionContext();
+ expCtx->ns = reshardingOplogNss();
+ expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
+
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+
+ // Mock non-lookup collection document source.
+ auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
+ pipeline->addInitialSource(mockSource);
+
+ auto next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(preImageOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(postImageOplog, boost::none), next->toBson());
+
+ next = pipeline->getNext();
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(insertOplog, boost::none), next->toBson());
next = pipeline->getNext();
- ASSERT_EQ("u", next->getField("op").getStringData());
- ASSERT_EQ(0, next->getField(kReshardingOplogPrePostImageOps).getArrayLength());
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(updateWithPostOplog, postImageOplog),
+ next->toBson());
next = pipeline->getNext();
- ASSERT_EQ("d", next->getField("op").getStringData());
- ASSERT_EQ(0, next->getField(kReshardingOplogPrePostImageOps).getArrayLength());
+ ASSERT_BSONOBJ_BINARY_EQ(addExpectedFields(deleteWithPreOplog, preImageOplog), next->toBson());
ASSERT(!pipeline->getNext());
}