diff options
-rw-r--r-- | src/mongo/db/repl/oplog_entry.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp | 159 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h | 85 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp | 193 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.h | 6 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util_test.cpp | 16 |
8 files changed, 466 insertions, 12 deletions
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index 86fa9f039d3..e419b473ef5 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -147,3 +147,7 @@ structs: optional: true description: "The optime of another oplog entry that contains the document after an update was applied." + prePostImageOps: + type: array<object> + optional: true + description: "Contains the oplog entries for the pre/post image related to this op" diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 0ce99516a1d..1f005757352 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -70,6 +70,7 @@ env.Library( 'resharding/resharding_op_observer.cpp', 'resharding/resharding_coordinator_observer.cpp', 'resharding/resharding_coordinator_service.cpp', + 'resharding/resharding_donor_oplog_iterator.cpp', 'resharding/resharding_donor_service.cpp', 'resharding/resharding_recipient_service.cpp', 'scoped_operation_completion_sharding_actions.cpp', @@ -532,6 +533,7 @@ env.CppUnitTest( 'balancer/type_migration_test.cpp', 'config_server_op_observer_test.cpp', 'vector_clock_config_server_test.cpp', + 'resharding/resharding_donor_oplog_iterator_test.cpp', 'resharding_util_refresh_test.cpp', 'resharding_util_test.cpp', 'resharding/resharding_coordinator_observer_test.cpp', diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp new file mode 100644 index 00000000000..ab3b96a3688 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h" + +#include <fmt/format.h> + +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/logv2/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +using namespace fmt::literals; + +namespace { + +/** + * Extracts the oplog id from the oplog. + */ +ReshardingDonorOplogId getId(const repl::OplogEntry& oplog) { + return ReshardingDonorOplogId::parse( + IDLParserErrorContext("ReshardingDonorOplogIterator::getOplogId"), + oplog.get_id()->getDocument().toBson()); +} + +} // anonymous namespace + +/** + * Sentinel oplog format: + * { + * op: "n", + * ns: "<database>.<collection>", + * ui: <existingUUID>, + * destinedRecipient: <recipientShardId>, + * o: {msg: "Writes to <database>.<collection> is temporarily blocked for resharding"}, + * o2: {type: "reshardFinalOp", reshardingUUID: <reshardingUUID>}, + * fromMigrate: true, + * } + */ +bool isFinalOplog(const repl::OplogEntry& oplog) { + if (oplog.getOpType() != repl::OpTypeEnum::kNoop) { + return false; + } + + auto o2Field = oplog.getObject2(); + if (!o2Field) { + return false; + } + + return o2Field->getField("type").valueStringDataSafe() == "reshardFinalOp"_sd; +} + +ReshardingDonorOplogIterator::ReshardingDonorOplogIterator( + NamespaceString donorOplogBufferNs, boost::optional<ReshardingDonorOplogId> resumeToken) + : _oplogBufferNs(std::move(donorOplogBufferNs)), _resumeToken(std::move(resumeToken)) {} + +Future<boost::optional<repl::OplogEntry>> ReshardingDonorOplogIterator::getNext( + OperationContext* opCtx) { + boost::optional<repl::OplogEntry> oplogToReturn; + + if (_hasSeenFinalOplogEntry) { + return Future<boost::optional<repl::OplogEntry>>::makeReady(oplogToReturn); + } + + if (!_pipeline) { + auto expCtx = _makeExpressionContext(opCtx); + _pipeline = createAggForReshardingOplogBuffer(std::move(expCtx), _resumeToken, true); + _pipeline->detachFromOperationContext(); + } + + _pipeline->reattachToOperationContext(opCtx); + ON_BLOCK_EXIT([this] { + if (_pipeline) { + _pipeline->detachFromOperationContext(); + } + }); + + auto next = _pipeline->getNext(); + + if (!next) { + _pipeline.reset(); + return _waitForNewOplog().then([this, opCtx] { return getNext(opCtx); }); + } + + auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(next->toBson())); + if (isFinalOplog(nextOplog)) { + _hasSeenFinalOplogEntry = true; + _pipeline.reset(); + return Future<boost::optional<repl::OplogEntry>>::makeReady(oplogToReturn); + } + + _resumeToken = getId(nextOplog); + + oplogToReturn = std::move(nextOplog); + return Future<boost::optional<repl::OplogEntry>>::makeReady(std::move(oplogToReturn)); +} + +bool ReshardingDonorOplogIterator::hasMore() const { + return !_hasSeenFinalOplogEntry; +} + +Future<void> ReshardingDonorOplogIterator::_waitForNewOplog() { + return Future<void>::makeReady(); +} + +boost::intrusive_ptr<ExpressionContext> ReshardingDonorOplogIterator::_makeExpressionContext( + OperationContext* opCtx) { + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + resolvedNamespaces.emplace(_oplogBufferNs.coll(), + ExpressionContext::ResolvedNamespace{_oplogBufferNs, {}}); + + return make_intrusive<ExpressionContext>(opCtx, + boost::none /* explain */, + false /* fromMongos */, + false /* needsMerge */, + false /* allowDiskUse */, + false /* bypassDocumentValidation */, + false /* isMapReduceCommand */, + _oplogBufferNs, + boost::none /* runtimeConstants */, + nullptr /* collator */, + MongoProcessInterface::create(opCtx), + std::move(resolvedNamespaces), + boost::none /* collUUID */); +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h new file mode 100644 index 00000000000..4bd97b8c0d6 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/s/resharding/donor_oplog_id_gen.h" +#include "mongo/util/future.h" + +namespace mongo { + +class OperationContext; + +/** + * Iterator for extracting oplog entries from the resharding donor oplog buffer. This is not thread + * safe. + */ +class ReshardingDonorOplogIterator { +public: + ReshardingDonorOplogIterator(NamespaceString donorOplogBufferNs, + boost::optional<ReshardingDonorOplogId> resumeToken); + + /** + * Returns the next oplog entry. Returns boost::none when there are no more entries to return. + * Calling getNext() when the previously returned future is not ready is undefined. + */ + Future<boost::optional<repl::OplogEntry>> getNext(OperationContext* opCtx); + + /** + * Returns false if this iterator has seen the final oplog entry. Since this is not thread safe, + * should not be called while there is a pending future from getNext() that is not ready. + */ + bool hasMore() const; + +private: + /** + * Returns a future to wait until a new oplog entry is inserted to the target oplog collection. + */ + Future<void> _waitForNewOplog(); + + /** + * Creates a new expression context that can be used to make a new pipeline to query the target + * oplog collection. + */ + boost::intrusive_ptr<ExpressionContext> _makeExpressionContext(OperationContext* opCtx); + + const NamespaceString _oplogBufferNs; + + boost::optional<ReshardingDonorOplogId> _resumeToken; + + std::unique_ptr<Pipeline, PipelineDeleter> _pipeline; + bool _hasSeenFinalOplogEntry{false}; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp new file mode 100644 index 00000000000..b1f65fa884b --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -0,0 +1,193 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_mongod_test_fixture.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/unittest/unittest.h" + +#include "mongo/logv2/log.h" + +namespace mongo { +namespace { + +repl::MutableOplogEntry makeOplog(const NamespaceString& nss, + const UUID& uuid, + const repl::OpTypeEnum& opType, + const BSONObj& oField, + const BSONObj& o2Field, + const ReshardingDonorOplogId& oplogId) { + repl::MutableOplogEntry oplogEntry; + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); + oplogEntry.setOpType(opType); + oplogEntry.setObject(oField); + + if (!o2Field.isEmpty()) { + oplogEntry.setObject2(o2Field); + } + + oplogEntry.setOpTimeAndWallTimeBase(repl::OpTimeAndWallTimeBase({}, {})); + oplogEntry.set_id(Value(oplogId.toBSON())); + + return oplogEntry; +} + +class ReshardingDonorOplogIterTest : public ShardingMongodTestFixture { +public: + repl::MutableOplogEntry makeInsertOplog(const Timestamp& id, BSONObj doc) { + ReshardingDonorOplogId oplogId(id, id); + return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kInsert, BSON("x" << 1), {}, oplogId); + } + + repl::MutableOplogEntry makeFinalOplog(const Timestamp& id) { + ReshardingDonorOplogId oplogId(id, id); + const BSONObj oField(BSON("msg" + << "Created temporary resharding collection")); + const BSONObj o2Field(BSON("type" + << "reshardFinalOp" + << "reshardingUUID" << UUID::gen())); + return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId); + } + + const NamespaceString& oplogNss() const { + return _oplogNss; + } + + BSONObj getId(const repl::MutableOplogEntry& oplog) { + return oplog.get_id()->getDocument().toBson(); + } + + BSONObj getId(const repl::OplogEntry& oplog) { + return oplog.get_id()->getDocument().toBson(); + } + +private: + const NamespaceString _oplogNss{"config.localReshardingOplogBuffer.xxx.yyy"}; + const NamespaceString _crudNss{"test.foo"}; + const UUID _uuid{UUID::gen()}; +}; + +TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) { + const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); + const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1)); + const auto finalOplog = makeFinalOplog(Timestamp(43, 24)); + const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1)); + + DBDirectClient client(operationContext()); + const auto ns = oplogNss().ns(); + client.insert(ns, oplog1.toBSON()); + client.insert(ns, oplog2.toBSON()); + client.insert(ns, finalOplog.toBSON()); + client.insert(ns, oplogBeyond.toBSON()); + + ReshardingDonorOplogIterator iter(oplogNss(), boost::none); + ASSERT_TRUE(iter.hasMore()); + auto next = iter.getNext(operationContext()).get(); + + ASSERT_BSONOBJ_EQ(getId(oplog1), getId(*next)); + + ASSERT_TRUE(iter.hasMore()); + next = iter.getNext(operationContext()).get(); + ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next)); + + ASSERT_TRUE(iter.hasMore()); + next = iter.getNext(operationContext()).get(); + ASSERT_FALSE(next); + + ASSERT_FALSE(iter.hasMore()); + next = iter.getNext(operationContext()).get(); + ASSERT_FALSE(next); +} + +TEST_F(ReshardingDonorOplogIterTest, ResumeFromMiddle) { + const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); + const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1)); + const auto finalOplog = makeFinalOplog(Timestamp(43, 24)); + + DBDirectClient client(operationContext()); + const auto ns = oplogNss().ns(); + client.insert(ns, oplog1.toBSON()); + client.insert(ns, oplog2.toBSON()); + client.insert(ns, finalOplog.toBSON()); + + ReshardingDonorOplogId resumeToken(Timestamp(2, 4), Timestamp(2, 4)); + ReshardingDonorOplogIterator iter(oplogNss(), resumeToken); + ASSERT_TRUE(iter.hasMore()); + auto next = iter.getNext(operationContext()).get(); + ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next)); + + ASSERT_TRUE(iter.hasMore()); + next = iter.getNext(operationContext()).get(); + ASSERT_FALSE(next); + + ASSERT_FALSE(iter.hasMore()); +} + +TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) { + const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1)); + const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1)); + const auto finalOplog = makeFinalOplog(Timestamp(43, 24)); + const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1)); + + DBDirectClient client(operationContext()); + const auto ns = oplogNss().ns(); + client.insert(ns, oplog1.toBSON()); + + ReshardingDonorOplogIterator iter(oplogNss(), boost::none); + ASSERT_TRUE(iter.hasMore()); + auto next = iter.getNext(operationContext()).get(); + ASSERT_BSONOBJ_EQ(getId(oplog1), getId(*next)); + + ASSERT_TRUE(iter.hasMore()); + + client.insert(ns, oplog2.toBSON()); + client.insert(ns, finalOplog.toBSON()); + client.insert(ns, oplogBeyond.toBSON()); + + next = iter.getNext(operationContext()).get(); + ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next)); + + ASSERT_TRUE(iter.hasMore()); + next = iter.getNext(operationContext()).get(); + ASSERT_FALSE(next); + + ASSERT_FALSE(iter.hasMore()); + next = iter.getNext(operationContext()).get(); + ASSERT_FALSE(next); +} + +} // anonymous namespace +} // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 849e99535d6..ccfc3caead7 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -302,8 +302,9 @@ void validateZones(const std::vector<mongo::BSONObj>& zones, std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer( const boost::intrusive_ptr<ExpressionContext>& expCtx, - const boost::optional<ReshardingDonorOplogId>& resumeToken) { - std::list<boost::intrusive_ptr<DocumentSource>> stages; + const boost::optional<ReshardingDonorOplogId>& resumeToken, + bool doAttachDocumentCursor) { + Pipeline::SourceContainer stages; if (resumeToken) { stages.emplace_back(DocumentSourceMatch::create( @@ -336,7 +337,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer( BSONObj lookupBSON(BSON("" << lookupBuilder.obj())); stages.emplace_back(DocumentSourceLookUp::createFromBson(lookupBSON.firstElement(), expCtx)); - return Pipeline::create(std::move(stages), expCtx); + auto pipeline = Pipeline::create(std::move(stages), expCtx); + if (doAttachDocumentCursor) { + pipeline = expCtx->mongoProcessInterface->attachCursorSourceToPipeline( + pipeline.release(), false /* allowTargetingShards */); + } + + return pipeline; } std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResharding( diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 8399e140a58..21e4d192682 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -128,10 +128,14 @@ void validateZones(const std::vector<mongo::BSONObj>& zones, * Create pipeline stages for iterating the buffered copy of the donor oplog and link together the * oplog entries with their preImage/postImage oplog. Note that caller is responsible for making * sure that the donorOplogNS is properly resolved and ns is set in the expCtx. + * + * If doAttachDocumentCursor is false, the caller will need to manually set the initial stage of the + * pipeline with a source. This is mostly useful for testing. */ std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer( const boost::intrusive_ptr<ExpressionContext>& expCtx, - const boost::optional<ReshardingDonorOplogId>& resumeToken); + const boost::optional<ReshardingDonorOplogId>& resumeToken, + bool doAttachDocumentCursor); /** * Create pipeline stages for iterating donor config.transactions. The pipeline has these stages: diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp index f11da606b3f..658c474e21f 100644 --- a/src/mongo/db/s/resharding_util_test.cpp +++ b/src/mongo/db/s/resharding_util_test.cpp @@ -460,7 +460,7 @@ TEST_F(ReshardingAggTest, OplogPipelineBasicCRUDOnly) { expCtx->ns = localOplogBufferNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); @@ -496,7 +496,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithResumeToken) { expCtx->ns = localOplogBufferNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog)); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog), false); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); @@ -538,7 +538,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithResumeTokenClusterTimeNotEqualTs) { expCtx->ns = localOplogBufferNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog)); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog), false); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); @@ -569,7 +569,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithPostImage) { expCtx->ns = localOplogBufferNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); @@ -610,7 +610,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithLargeBSONPostImage) { expCtx->ns = localOplogBufferNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); @@ -651,7 +651,7 @@ TEST_F(ReshardingAggTest, OplogPipelineResumeAfterPostImage) { expCtx->ns = localOplogBufferNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(postImageOplog)); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(postImageOplog), false); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); @@ -683,7 +683,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithPreImage) { expCtx->ns = localOplogBufferNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); @@ -724,7 +724,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithPreAndPostImage) { expCtx->ns = localOplogBufferNss(); expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults); - auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none); + auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false); // Mock non-lookup collection document source. auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx); |