summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2022-05-27 14:47:58 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-27 15:29:59 +0000
commit2cbe2539dc4ae363686409c7ec8b67fa36eddc29 (patch)
treeceace3a266b102d2c3e113ba4f8659eb8cd96938
parent48c6f489560a56aad037d43878350f30b9e4a00a (diff)
downloadmongo-2cbe2539dc4ae363686409c7ec8b67fa36eddc29.tar.gz
SERVER-63437 Expose all events on 'system.resharding' collections when 'showSystemEvents' is true
Co-authored-by: Arun Banala <arun.banala@mongodb.com>
-rw-r--r--jstests/change_streams/show_resharding_system_events.js142
-rw-r--r--jstests/libs/change_stream_util.js2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h5
-rw-r--r--src/mongo/db/repl/SConscript14
-rw-r--r--src/mongo/db/repl/change_stream_oplog_notification.cpp73
-rw-r--r--src/mongo/db/repl/change_stream_oplog_notification.h47
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp39
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp12
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp54
10 files changed, 348 insertions, 42 deletions
diff --git a/jstests/change_streams/show_resharding_system_events.js b/jstests/change_streams/show_resharding_system_events.js
new file mode 100644
index 00000000000..8463102419a
--- /dev/null
+++ b/jstests/change_streams/show_resharding_system_events.js
@@ -0,0 +1,142 @@
+/**
+ * Tests the behavior of change streams on a system.resharding.* namespace in the presence of
+ * 'showSystemEvents' flag. This is a separate test from 'show_system_events.js' because it can only
+ * operate in a sharded cluster.
+ *
+ * @tags: [
+ * requires_fcv_61,
+ * featureFlagChangeStreamsVisibility,
+ * featureFlagChangeStreamsFurtherEnrichedEvents,
+ * requires_sharding,
+ * uses_change_streams,
+ * change_stream_does_not_expect_txns,
+ * assumes_unsharded_collection,
+ * assumes_read_preference_unchanged,
+ * ]
+ */
+(function() {
+"use strict";
+
+load('jstests/libs/change_stream_util.js'); // For 'assertChangeStreamEventEq'.
+
+// Create a single-shard cluster for this test.
+const st = new ShardingTest({
+ shards: 1,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
+});
+
+const testDB = st.s.getDB(jsTestName());
+const testColl = testDB[jsTestName()];
+
+// Shard the collection based on '_id'.
+st.shardColl(testColl, {_id: 1}, false);
+
+// Build an index on the collection to support the resharding operation.
+assert.commandWorked(testColl.createIndex({a: 1}));
+
+// Insert some documents that will be resharded.
+assert.commandWorked(testColl.insert({_id: 0, a: 0}));
+assert.commandWorked(testColl.insert({_id: 1, a: 1}));
+
+// Helper function to retrieve the UUID of the specified collection.
+function getCollectionUuid(coll) {
+ const collInfo = testDB.getCollectionInfos({name: coll.getName()})[0];
+ return collInfo.info.uuid;
+}
+
+// Obtain a resume token indicating the start point for the test.
+const startPoint = testDB.watch().getResumeToken();
+
+// Get the UUID of the collection before resharding.
+const oldUUID = getCollectionUuid(testColl);
+
+// Reshard the collection.
+assert.commandWorked(st.s.adminCommand({
+ reshardCollection: testColl.getFullName(),
+ key: {a: 1},
+}));
+
+// Get the UUID of the collection after resharding.
+const newUUID = getCollectionUuid(testColl);
+
+// Write one more sentinel document into the collection.
+assert.commandWorked(testColl.insert({_id: 2, a: 2}));
+
+// Now confirm the sequence of events that we expect to see in the change stream.
+const reshardingCollName = `system.resharding.${oldUUID.toString().match(/\"([^\"]+)\"/)[1]}`;
+const reshardingNs = {
+ db: testDB.getName(),
+ coll: reshardingCollName
+};
+const origNs = {
+ db: testDB.getName(),
+ coll: testColl.getName()
+};
+const expectedReshardingEvents = [
+ {ns: reshardingNs, collectionUUID: newUUID, operationType: "create"},
+ {
+ ns: reshardingNs,
+ collectionUUID: newUUID,
+ operationType: "createIndexes",
+ operationDescription: {indexes: [{v: 2, key: {a: 1}, name: "a_1"}]}
+ },
+ {
+ ns: reshardingNs,
+ collectionUUID: newUUID,
+ operationType: "shardCollection",
+ operationDescription: {shardKey: {a: 1}}
+ },
+ {
+ ns: reshardingNs,
+ collectionUUID: newUUID,
+ operationType: "insert",
+ fullDocument: {_id: 0, a: 0},
+ documentKey: {a: 0, _id: 0}
+ },
+ {
+ ns: reshardingNs,
+ collectionUUID: newUUID,
+ operationType: "insert",
+ fullDocument: {_id: 1, a: 1},
+ documentKey: {a: 1, _id: 1}
+ },
+ {
+ ns: origNs,
+ collectionUUID: oldUUID,
+ operationType: "reshardCollection",
+ operationDescription:
+ {reshardUUID: newUUID, shardKey: {a: 1}, oldShardKey: {_id: 1}, unique: false}
+ },
+ {
+ ns: origNs,
+ collectionUUID: newUUID,
+ operationType: "insert",
+ fullDocument: {_id: 2, a: 2},
+ documentKey: {a: 2, _id: 2}
+ },
+];
+
+// Helper to confirm the sequence of events observed in the change stream.
+function assertChangeStreamEventSequence(csConfig, expectedEvents) {
+ // Open a change stream on the test DB using the given configuration.
+ const finalConfig =
+ Object.assign({resumeAfter: startPoint, showExpandedEvents: true}, csConfig);
+ const csCursor = testDB.watch([], finalConfig);
+
+ // Confirm that we see the expected sequence of events.
+ expectedEvents.forEach((expectedEvent) => {
+ assert.soon(() => csCursor.hasNext());
+ assertChangeStreamEventEq(csCursor.next(), expectedEvent);
+ });
+}
+
+// With showSystemEvents set to true, we expect to see the full sequence of events.
+assertChangeStreamEventSequence({showSystemEvents: true}, expectedReshardingEvents);
+
+// With showSystemEvents set to false, we expect to only see events on the original namespace.
+const nonSystemEvents =
+ expectedReshardingEvents.filter((event) => (event.ns.coll === testColl.getName()));
+assertChangeStreamEventSequence({showSystemEvents: false}, nonSystemEvents);
+
+st.stop();
+}()); \ No newline at end of file
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 15028c54af2..e5a00bf92c9 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -161,7 +161,7 @@ function isChangeStreamEventEq(actualEvent, expectedEvent) {
function assertChangeStreamEventEq(actualEvent, expectedEvent) {
assert(isChangeStreamEventEq(actualEvent, expectedEvent),
() => "Change events did not match. Expected: " + tojsonMaybeTruncate(expectedEvent) +
- ", Actual: " + tojsonMaybeTruncate(testEvent));
+ ", Actual: " + tojsonMaybeTruncate(actualEvent));
}
function ChangeStreamTest(_db, name = "ChangeStreamTest") {
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 1ecdde1d5b6..cb720da2515 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -246,9 +246,10 @@ public:
// Default regex for collections match which prohibits system collections.
static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd;
- // Regex matching all regular collections plus certain system collections.
+
+ // Regex matching all user collections plus collections exposed when 'showSystemEvents' is set.
static constexpr StringData kRegexAllCollectionsShowSystemEvents =
- R"((?!(\$|system\.(?!(js$)))))"_sd;
+ R"((?!(\$|system\.(?!(js$|resharding\.)))))"_sd;
static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd;
static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd;
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 7f496a55383..3379a0d34d1 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -40,6 +40,20 @@ env.Library(
)
env.Library(
+ target='change_stream_oplog_notification',
+ source=[
+ 'change_stream_oplog_notification.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/concurrency/exception_util',
+ '$BUILD_DIR/mongo/db/concurrency/lock_manager',
+ '$BUILD_DIR/mongo/db/dbhelpers',
+ 'oplog',
+ 'oplog_entry',
+ ],
+)
+
+env.Library(
target='oplog',
source=[
'apply_ops.cpp',
diff --git a/src/mongo/db/repl/change_stream_oplog_notification.cpp b/src/mongo/db/repl/change_stream_oplog_notification.cpp
new file mode 100644
index 00000000000..531f7af2e57
--- /dev/null
+++ b/src/mongo/db/repl/change_stream_oplog_notification.cpp
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/repl/change_stream_oplog_notification.h"
+
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/logv2/redaction.h"
+
+namespace mongo {
+
+void notifyChangeStreamsOnShardCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ BSONObj cmd) {
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append("shardCollection", nss.ns());
+ cmdBuilder.appendElements(cmd);
+
+ BSONObj fullCmd = cmdBuilder.obj();
+
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setNss(nss);
+ oplogEntry.setUuid(uuid);
+ oplogEntry.setObject(BSON("msg" << BSON("shardCollection" << nss.ns())));
+ oplogEntry.setObject2(fullCmd);
+ oplogEntry.setOpTime(repl::OpTime());
+ oplogEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
+
+ writeConflictRetry(
+ opCtx, "ShardCollectionWritesOplog", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
+ WriteUnitOfWork wunit(opCtx);
+ const auto& oplogOpTime = repl::logOp(opCtx, &oplogEntry);
+ uassert(8423339,
+ str::stream() << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString() << ": "
+ << redact(oplogEntry.toBSON()),
+ !oplogOpTime.isNull());
+ wunit.commit();
+ });
+}
+} // namespace mongo
diff --git a/src/mongo/db/repl/change_stream_oplog_notification.h b/src/mongo/db/repl/change_stream_oplog_notification.h
new file mode 100644
index 00000000000..ee746b96167
--- /dev/null
+++ b/src/mongo/db/repl/change_stream_oplog_notification.h
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/util/uuid.h"
+
+namespace mongo {
+
+/*
+ * This function writes a no-op oplog entry on shardCollection event.
+ * TODO SERVER-66333: move all other notifyChangeStreams* functions here.
+ */
+void notifyChangeStreamsOnShardCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ BSONObj cmd);
+} // namespace mongo
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 07b08105b8b..e0cf8313b31 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -146,6 +146,7 @@ env.Library(
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/pipeline/aggregation_request_helper',
'$BUILD_DIR/mongo/db/repl/abstract_async_component',
+ '$BUILD_DIR/mongo/db/repl/change_stream_oplog_notification',
'$BUILD_DIR/mongo/db/repl/oplog',
'$BUILD_DIR/mongo/db/repl/primary_only_service',
'$BUILD_DIR/mongo/db/repl/wait_for_majority_service',
@@ -458,6 +459,7 @@ env.Library(
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/multitenancy',
+ '$BUILD_DIR/mongo/db/repl/change_stream_oplog_notification',
'$BUILD_DIR/mongo/db/repl/primary_only_service',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/repl/replica_set_messages',
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index cea380bd7c7..9d4806761ee 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/db/repl/change_stream_oplog_notification.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/create_collection_coordinator.h"
#include "mongo/db/s/recoverable_critical_section_service.h"
@@ -356,42 +357,6 @@ void broadcastDropCollection(OperationContext* opCtx,
opCtx, nss, participants, executor, osi);
}
-/**
- * This function writes a no-op oplog entry on shardCollection event.
- */
-void _writeOplogMessage(OperationContext* opCtx,
- const NamespaceString& nss,
- const UUID& uuid,
- const BSONObj cmd) {
- BSONObjBuilder cmdBuilder;
- cmdBuilder.append("shardCollection", nss.ns());
- cmdBuilder.appendElements(cmd);
-
- BSONObj fullCmd = cmdBuilder.obj();
-
- repl::MutableOplogEntry oplogEntry;
- oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
- oplogEntry.setNss(nss);
- oplogEntry.setUuid(uuid);
- oplogEntry.setObject(BSON("msg" << BSON("shardCollection" << nss.ns())));
- oplogEntry.setObject2(fullCmd);
- oplogEntry.setOpTime(repl::OpTime());
- oplogEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now());
-
- writeConflictRetry(
- opCtx, "ShardCollectionWritesOplog", NamespaceString::kRsOplogNamespace.ns(), [&] {
- AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite);
- WriteUnitOfWork wunit(opCtx);
- const auto& oplogOpTime = repl::logOp(opCtx, &oplogEntry);
- uassert(8423339,
- str::stream() << "Failed to create new oplog entry for oplog with opTime: "
- << oplogEntry.getOpTime().toString() << ": "
- << redact(oplogEntry.toBSON()),
- !oplogOpTime.isNull());
- wunit.commit();
- });
-}
-
} // namespace
CreateCollectionCoordinator::CreateCollectionCoordinator(ShardingDDLCoordinatorService* service,
@@ -904,7 +869,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
try {
insertCollectionEntry(opCtx, nss(), coll, getCurrentSession(_doc));
- _writeOplogMessage(opCtx, nss(), *_collectionUUID, _request.toBSON());
+ notifyChangeStreamsOnShardCollection(opCtx, nss(), *_collectionUUID, _request.toBSON());
LOGV2_DEBUG(5277907, 2, "Collection successfully committed", "namespace"_attr = nss());
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 4713e9d63cd..fc225f6f801 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/ops/delete.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/query/collation/collation_spec.h"
+#include "mongo/db/repl/change_stream_oplog_notification.h"
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/wait_for_majority_service.h"
@@ -647,6 +648,17 @@ void ReshardingRecipientService::RecipientStateMachine::
true /* enforceUniquenessCheck */,
shardkeyutil::ValidationBehaviorsShardCollection(opCtx.get()));
});
+
+ // We add a fake 'shardCollection' notification here so that the C2C replicator can sync the
+ // resharding operation to the target cluster. The only information we have is the shard
+ // key, but all other fields must either be default-valued or are ignored by C2C.
+ // TODO SERVER-66671: The 'createCollRequest' should include the full contents of the
+ // CreateCollectionRequest rather than just the 'shardKey' field.
+ const auto createCollRequest = BSON("shardKey" << _metadata.getReshardingKey().toBSON());
+ notifyChangeStreamsOnShardCollection(opCtx.get(),
+ _metadata.getTempReshardingNss(),
+ _metadata.getReshardingUUID(),
+ createCollRequest);
}
_transitionToCloning(factory);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index b8516da1e18..50d635016f7 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -240,6 +240,10 @@ public:
return ReshardingMetrics::get(serviceContext);
}
+ BSONObj newShardKeyPattern() {
+ return BSON("newKey" << 1);
+ }
+
ReshardingRecipientDocument makeStateDocument(bool isAlsoDonor) {
RecipientShardContext recipientCtx;
recipientCtx.setState(RecipientStateEnum::kAwaitingFetchTimestamp);
@@ -257,7 +261,7 @@ public:
sourceNss,
sourceUUID,
constructTemporaryReshardingNss(sourceNss.db(), sourceUUID),
- BSON("newKey" << 1));
+ newShardKeyPattern());
doc.setCommonReshardingMetadata(std::move(commonMetadata));
return doc;
@@ -635,7 +639,8 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryOnReshardDoneCatchUp)
NamespaceString sourceNss = constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID());
FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace};
- findRequest.setFilter(BSON("ns" << sourceNss.toString()));
+ findRequest.setFilter(
+ BSON("ns" << sourceNss.toString() << "o2.reshardDoneCatchUp" << BSON("$exists" << true)));
auto cursor = client.find(std::move(findRequest));
ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection";
@@ -656,6 +661,51 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryOnReshardDoneCatchUp)
ASSERT_FALSE(bool(op.getDestinedRecipient())) << op.getEntry();
}
+TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryForImplicitShardCollection) {
+ boost::optional<PauseDuringStateTransitions> doneTransitionGuard;
+ doneTransitionGuard.emplace(controller(), RecipientStateEnum::kDone);
+
+ auto doc = makeStateDocument(false /* isAlsoDonor */);
+ auto opCtx = makeOperationContext();
+ auto rawOpCtx = opCtx.get();
+ RecipientStateMachine::insertStateDocument(rawOpCtx, doc);
+ auto recipient = RecipientStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON());
+
+ notifyToStartCloning(rawOpCtx, *recipient, doc);
+ notifyReshardingCommitting(opCtx.get(), *recipient, doc);
+
+ doneTransitionGuard->wait(RecipientStateEnum::kDone);
+
+ stepDown();
+ doneTransitionGuard.reset();
+ ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+
+ DBDirectClient client(opCtx.get());
+ NamespaceString sourceNss = constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID());
+
+ FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace};
+ findRequest.setFilter(
+ BSON("ns" << sourceNss.toString() << "o2.shardCollection" << BSON("$exists" << true)));
+ auto cursor = client.find(std::move(findRequest));
+
+ ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection";
+ repl::OplogEntry shardCollectionOp(cursor->next());
+
+ ASSERT_EQ(OpType_serializer(shardCollectionOp.getOpType()),
+ OpType_serializer(repl::OpTypeEnum::kNoop))
+ << shardCollectionOp.getEntry();
+ ASSERT_EQ(*shardCollectionOp.getUuid(), doc.getReshardingUUID())
+ << shardCollectionOp.getEntry();
+ ASSERT_EQ(shardCollectionOp.getObject()["msg"].type(), BSONType::Object)
+ << shardCollectionOp.getEntry();
+ ASSERT_FALSE(shardCollectionOp.getFromMigrate());
+
+ auto shardCollEventExpected =
+ BSON("shardCollection" << sourceNss.toString() << "shardKey" << newShardKeyPattern());
+ ASSERT_BSONOBJ_EQ(*shardCollectionOp.getObject2(), shardCollEventExpected);
+}
+
TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) {
auto metrics = ReshardingRecipientServiceTest::metrics();