summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2020-08-28 20:51:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-29 22:03:30 +0000
commita8047f6509f9c2ef92d49847f698cb3c0e96277b (patch)
tree5445c8eea0291065a42ec56a8a7d8b4a2c836ec7
parentf66c116902666e5b4c08ed854627f79cfe163d7c (diff)
downloadmongo-a8047f6509f9c2ef92d49847f698cb3c0e96277b.tar.gz
SERVER-50118 Create oplog buffer iterator for resharding
-rw-r--r--src/mongo/db/repl/oplog_entry.idl4
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp159
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h85
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp193
-rw-r--r--src/mongo/db/s/resharding_util.cpp13
-rw-r--r--src/mongo/db/s/resharding_util.h6
-rw-r--r--src/mongo/db/s/resharding_util_test.cpp16
8 files changed, 466 insertions, 12 deletions
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 86fa9f039d3..e419b473ef5 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -147,3 +147,7 @@ structs:
optional: true
description: "The optime of another oplog entry that contains the document
after an update was applied."
+ prePostImageOps:
+ type: array<object>
+ optional: true
+ description: "Contains the oplog entries for the pre/post image related to this op"
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 0ce99516a1d..1f005757352 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -70,6 +70,7 @@ env.Library(
'resharding/resharding_op_observer.cpp',
'resharding/resharding_coordinator_observer.cpp',
'resharding/resharding_coordinator_service.cpp',
+ 'resharding/resharding_donor_oplog_iterator.cpp',
'resharding/resharding_donor_service.cpp',
'resharding/resharding_recipient_service.cpp',
'scoped_operation_completion_sharding_actions.cpp',
@@ -532,6 +533,7 @@ env.CppUnitTest(
'balancer/type_migration_test.cpp',
'config_server_op_observer_test.cpp',
'vector_clock_config_server_test.cpp',
+ 'resharding/resharding_donor_oplog_iterator_test.cpp',
'resharding_util_refresh_test.cpp',
'resharding_util_test.cpp',
'resharding/resharding_coordinator_observer_test.cpp',
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
new file mode 100644
index 00000000000..ab3b96a3688
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
@@ -0,0 +1,159 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h"
+
+#include <fmt/format.h>
+
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+using namespace fmt::literals;
+
+namespace {
+
+/**
+ * Extracts the oplog id from the oplog.
+ */
+ReshardingDonorOplogId getId(const repl::OplogEntry& oplog) {
+ return ReshardingDonorOplogId::parse(
+ IDLParserErrorContext("ReshardingDonorOplogIterator::getOplogId"),
+ oplog.get_id()->getDocument().toBson());
+}
+
+} // anonymous namespace
+
+/**
+ * Sentinel oplog format:
+ * {
+ * op: "n",
+ * ns: "<database>.<collection>",
+ * ui: <existingUUID>,
+ * destinedRecipient: <recipientShardId>,
+ * o: {msg: "Writes to <database>.<collection> is temporarily blocked for resharding"},
+ * o2: {type: "reshardFinalOp", reshardingUUID: <reshardingUUID>},
+ * fromMigrate: true,
+ * }
+ */
+bool isFinalOplog(const repl::OplogEntry& oplog) {
+ if (oplog.getOpType() != repl::OpTypeEnum::kNoop) {
+ return false;
+ }
+
+ auto o2Field = oplog.getObject2();
+ if (!o2Field) {
+ return false;
+ }
+
+ return o2Field->getField("type").valueStringDataSafe() == "reshardFinalOp"_sd;
+}
+
+ReshardingDonorOplogIterator::ReshardingDonorOplogIterator(
+ NamespaceString donorOplogBufferNs, boost::optional<ReshardingDonorOplogId> resumeToken)
+ : _oplogBufferNs(std::move(donorOplogBufferNs)), _resumeToken(std::move(resumeToken)) {}
+
+Future<boost::optional<repl::OplogEntry>> ReshardingDonorOplogIterator::getNext(
+ OperationContext* opCtx) {
+ boost::optional<repl::OplogEntry> oplogToReturn;
+
+ if (_hasSeenFinalOplogEntry) {
+ return Future<boost::optional<repl::OplogEntry>>::makeReady(oplogToReturn);
+ }
+
+ if (!_pipeline) {
+ auto expCtx = _makeExpressionContext(opCtx);
+ _pipeline = createAggForReshardingOplogBuffer(std::move(expCtx), _resumeToken, true);
+ _pipeline->detachFromOperationContext();
+ }
+
+ _pipeline->reattachToOperationContext(opCtx);
+ ON_BLOCK_EXIT([this] {
+ if (_pipeline) {
+ _pipeline->detachFromOperationContext();
+ }
+ });
+
+ auto next = _pipeline->getNext();
+
+ if (!next) {
+ _pipeline.reset();
+ return _waitForNewOplog().then([this, opCtx] { return getNext(opCtx); });
+ }
+
+ auto nextOplog = uassertStatusOK(repl::OplogEntry::parse(next->toBson()));
+ if (isFinalOplog(nextOplog)) {
+ _hasSeenFinalOplogEntry = true;
+ _pipeline.reset();
+ return Future<boost::optional<repl::OplogEntry>>::makeReady(oplogToReturn);
+ }
+
+ _resumeToken = getId(nextOplog);
+
+ oplogToReturn = std::move(nextOplog);
+ return Future<boost::optional<repl::OplogEntry>>::makeReady(std::move(oplogToReturn));
+}
+
+bool ReshardingDonorOplogIterator::hasMore() const {
+ return !_hasSeenFinalOplogEntry;
+}
+
+Future<void> ReshardingDonorOplogIterator::_waitForNewOplog() {
+ return Future<void>::makeReady();
+}
+
+boost::intrusive_ptr<ExpressionContext> ReshardingDonorOplogIterator::_makeExpressionContext(
+ OperationContext* opCtx) {
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces.emplace(_oplogBufferNs.coll(),
+ ExpressionContext::ResolvedNamespace{_oplogBufferNs, {}});
+
+ return make_intrusive<ExpressionContext>(opCtx,
+ boost::none /* explain */,
+ false /* fromMongos */,
+ false /* needsMerge */,
+ false /* allowDiskUse */,
+ false /* bypassDocumentValidation */,
+ false /* isMapReduceCommand */,
+ _oplogBufferNs,
+ boost::none /* runtimeConstants */,
+ nullptr /* collator */,
+ MongoProcessInterface::create(opCtx),
+ std::move(resolvedNamespaces),
+ boost::none /* collUUID */);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h
new file mode 100644
index 00000000000..4bd97b8c0d6
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+class OperationContext;
+
+/**
+ * Iterator for extracting oplog entries from the resharding donor oplog buffer. This is not thread
+ * safe.
+ */
+class ReshardingDonorOplogIterator {
+public:
+ ReshardingDonorOplogIterator(NamespaceString donorOplogBufferNs,
+ boost::optional<ReshardingDonorOplogId> resumeToken);
+
+ /**
+ * Returns the next oplog entry. Returns boost::none when there are no more entries to return.
+ * Calling getNext() when the previously returned future is not ready is undefined.
+ */
+ Future<boost::optional<repl::OplogEntry>> getNext(OperationContext* opCtx);
+
+ /**
+ * Returns false if this iterator has seen the final oplog entry. Since this is not thread safe,
+ * should not be called while there is a pending future from getNext() that is not ready.
+ */
+ bool hasMore() const;
+
+private:
+ /**
+ * Returns a future to wait until a new oplog entry is inserted to the target oplog collection.
+ */
+ Future<void> _waitForNewOplog();
+
+ /**
+ * Creates a new expression context that can be used to make a new pipeline to query the target
+ * oplog collection.
+ */
+ boost::intrusive_ptr<ExpressionContext> _makeExpressionContext(OperationContext* opCtx);
+
+ const NamespaceString _oplogBufferNs;
+
+ boost::optional<ReshardingDonorOplogId> _resumeToken;
+
+ std::unique_ptr<Pipeline, PipelineDeleter> _pipeline;
+ bool _hasSeenFinalOplogEntry{false};
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
new file mode 100644
index 00000000000..b1f65fa884b
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -0,0 +1,193 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/s/resharding/resharding_donor_oplog_iterator.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_mongod_test_fixture.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+namespace {
+
+repl::MutableOplogEntry makeOplog(const NamespaceString& nss,
+ const UUID& uuid,
+ const repl::OpTypeEnum& opType,
+ const BSONObj& oField,
+ const BSONObj& o2Field,
+ const ReshardingDonorOplogId& oplogId) {
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setOpType(opType);
+ oplogEntry.setObject(oField);
+
+ if (!o2Field.isEmpty()) {
+ oplogEntry.setObject2(o2Field);
+ }
+
+ oplogEntry.setOpTimeAndWallTimeBase(repl::OpTimeAndWallTimeBase({}, {}));
+ oplogEntry.set_id(Value(oplogId.toBSON()));
+
+ return oplogEntry;
+}
+
+class ReshardingDonorOplogIterTest : public ShardingMongodTestFixture {
+public:
+ repl::MutableOplogEntry makeInsertOplog(const Timestamp& id, BSONObj doc) {
+ ReshardingDonorOplogId oplogId(id, id);
+ return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kInsert, BSON("x" << 1), {}, oplogId);
+ }
+
+ repl::MutableOplogEntry makeFinalOplog(const Timestamp& id) {
+ ReshardingDonorOplogId oplogId(id, id);
+ const BSONObj oField(BSON("msg"
+ << "Created temporary resharding collection"));
+ const BSONObj o2Field(BSON("type"
+ << "reshardFinalOp"
+ << "reshardingUUID" << UUID::gen()));
+ return makeOplog(_crudNss, _uuid, repl::OpTypeEnum::kNoop, oField, o2Field, oplogId);
+ }
+
+ const NamespaceString& oplogNss() const {
+ return _oplogNss;
+ }
+
+ BSONObj getId(const repl::MutableOplogEntry& oplog) {
+ return oplog.get_id()->getDocument().toBson();
+ }
+
+ BSONObj getId(const repl::OplogEntry& oplog) {
+ return oplog.get_id()->getDocument().toBson();
+ }
+
+private:
+ const NamespaceString _oplogNss{"config.localReshardingOplogBuffer.xxx.yyy"};
+ const NamespaceString _crudNss{"test.foo"};
+ const UUID _uuid{UUID::gen()};
+};
+
+TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) {
+ const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1));
+ const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1));
+ const auto finalOplog = makeFinalOplog(Timestamp(43, 24));
+ const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1));
+
+ DBDirectClient client(operationContext());
+ const auto ns = oplogNss().ns();
+ client.insert(ns, oplog1.toBSON());
+ client.insert(ns, oplog2.toBSON());
+ client.insert(ns, finalOplog.toBSON());
+ client.insert(ns, oplogBeyond.toBSON());
+
+ ReshardingDonorOplogIterator iter(oplogNss(), boost::none);
+ ASSERT_TRUE(iter.hasMore());
+ auto next = iter.getNext(operationContext()).get();
+
+ ASSERT_BSONOBJ_EQ(getId(oplog1), getId(*next));
+
+ ASSERT_TRUE(iter.hasMore());
+ next = iter.getNext(operationContext()).get();
+ ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next));
+
+ ASSERT_TRUE(iter.hasMore());
+ next = iter.getNext(operationContext()).get();
+ ASSERT_FALSE(next);
+
+ ASSERT_FALSE(iter.hasMore());
+ next = iter.getNext(operationContext()).get();
+ ASSERT_FALSE(next);
+}
+
+TEST_F(ReshardingDonorOplogIterTest, ResumeFromMiddle) {
+ const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1));
+ const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1));
+ const auto finalOplog = makeFinalOplog(Timestamp(43, 24));
+
+ DBDirectClient client(operationContext());
+ const auto ns = oplogNss().ns();
+ client.insert(ns, oplog1.toBSON());
+ client.insert(ns, oplog2.toBSON());
+ client.insert(ns, finalOplog.toBSON());
+
+ ReshardingDonorOplogId resumeToken(Timestamp(2, 4), Timestamp(2, 4));
+ ReshardingDonorOplogIterator iter(oplogNss(), resumeToken);
+ ASSERT_TRUE(iter.hasMore());
+ auto next = iter.getNext(operationContext()).get();
+ ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next));
+
+ ASSERT_TRUE(iter.hasMore());
+ next = iter.getNext(operationContext()).get();
+ ASSERT_FALSE(next);
+
+ ASSERT_FALSE(iter.hasMore());
+}
+
+TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) {
+ const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1));
+ const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1));
+ const auto finalOplog = makeFinalOplog(Timestamp(43, 24));
+ const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1));
+
+ DBDirectClient client(operationContext());
+ const auto ns = oplogNss().ns();
+ client.insert(ns, oplog1.toBSON());
+
+ ReshardingDonorOplogIterator iter(oplogNss(), boost::none);
+ ASSERT_TRUE(iter.hasMore());
+ auto next = iter.getNext(operationContext()).get();
+ ASSERT_BSONOBJ_EQ(getId(oplog1), getId(*next));
+
+ ASSERT_TRUE(iter.hasMore());
+
+ client.insert(ns, oplog2.toBSON());
+ client.insert(ns, finalOplog.toBSON());
+ client.insert(ns, oplogBeyond.toBSON());
+
+ next = iter.getNext(operationContext()).get();
+ ASSERT_BSONOBJ_EQ(getId(oplog2), getId(*next));
+
+ ASSERT_TRUE(iter.hasMore());
+ next = iter.getNext(operationContext()).get();
+ ASSERT_FALSE(next);
+
+ ASSERT_FALSE(iter.hasMore());
+ next = iter.getNext(operationContext()).get();
+ ASSERT_FALSE(next);
+}
+
+} // anonymous namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 849e99535d6..ccfc3caead7 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -302,8 +302,9 @@ void validateZones(const std::vector<mongo::BSONObj>& zones,
std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const boost::optional<ReshardingDonorOplogId>& resumeToken) {
- std::list<boost::intrusive_ptr<DocumentSource>> stages;
+ const boost::optional<ReshardingDonorOplogId>& resumeToken,
+ bool doAttachDocumentCursor) {
+ Pipeline::SourceContainer stages;
if (resumeToken) {
stages.emplace_back(DocumentSourceMatch::create(
@@ -336,7 +337,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer(
BSONObj lookupBSON(BSON("" << lookupBuilder.obj()));
stages.emplace_back(DocumentSourceLookUp::createFromBson(lookupBSON.firstElement(), expCtx));
- return Pipeline::create(std::move(stages), expCtx);
+ auto pipeline = Pipeline::create(std::move(stages), expCtx);
+ if (doAttachDocumentCursor) {
+ pipeline = expCtx->mongoProcessInterface->attachCursorSourceToPipeline(
+ pipeline.release(), false /* allowTargetingShards */);
+ }
+
+ return pipeline;
}
std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResharding(
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 8399e140a58..21e4d192682 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -128,10 +128,14 @@ void validateZones(const std::vector<mongo::BSONObj>& zones,
* Create pipeline stages for iterating the buffered copy of the donor oplog and link together the
* oplog entries with their preImage/postImage oplog. Note that caller is responsible for making
* sure that the donorOplogNS is properly resolved and ns is set in the expCtx.
+ *
+ * If doAttachDocumentCursor is false, the caller will need to manually set the initial stage of the
+ * pipeline with a source. This is mostly useful for testing.
*/
std::unique_ptr<Pipeline, PipelineDeleter> createAggForReshardingOplogBuffer(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const boost::optional<ReshardingDonorOplogId>& resumeToken);
+ const boost::optional<ReshardingDonorOplogId>& resumeToken,
+ bool doAttachDocumentCursor);
/**
* Create pipeline stages for iterating donor config.transactions. The pipeline has these stages:
diff --git a/src/mongo/db/s/resharding_util_test.cpp b/src/mongo/db/s/resharding_util_test.cpp
index f11da606b3f..658c474e21f 100644
--- a/src/mongo/db/s/resharding_util_test.cpp
+++ b/src/mongo/db/s/resharding_util_test.cpp
@@ -460,7 +460,7 @@ TEST_F(ReshardingAggTest, OplogPipelineBasicCRUDOnly) {
expCtx->ns = localOplogBufferNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false);
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
@@ -496,7 +496,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithResumeToken) {
expCtx->ns = localOplogBufferNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog));
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog), false);
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
@@ -538,7 +538,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithResumeTokenClusterTimeNotEqualTs) {
expCtx->ns = localOplogBufferNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog));
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(insertOplog), false);
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
@@ -569,7 +569,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithPostImage) {
expCtx->ns = localOplogBufferNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false);
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
@@ -610,7 +610,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithLargeBSONPostImage) {
expCtx->ns = localOplogBufferNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false);
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
@@ -651,7 +651,7 @@ TEST_F(ReshardingAggTest, OplogPipelineResumeAfterPostImage) {
expCtx->ns = localOplogBufferNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(postImageOplog));
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, getOplogId(postImageOplog), false);
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
@@ -683,7 +683,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithPreImage) {
expCtx->ns = localOplogBufferNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false);
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);
@@ -724,7 +724,7 @@ TEST_F(ReshardingAggTest, OplogPipelineWithPreAndPostImage) {
expCtx->ns = localOplogBufferNss();
expCtx->mongoProcessInterface = std::make_shared<MockMongoInterface>(mockResults);
- auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none);
+ auto pipeline = createAggForReshardingOplogBuffer(expCtx, boost::none, false);
// Mock non-lookup collection document source.
auto mockSource = DocumentSourceMock::createForTest(mockResults, expCtx);