diff options
-rw-r--r-- | jstests/change_streams/show_resharding_system_events.js | 142 | ||||
-rw-r--r-- | jstests/libs/change_stream_util.js | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/change_stream_oplog_notification.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/repl/change_stream_oplog_notification.h | 47 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service_test.cpp | 54 |
10 files changed, 348 insertions, 42 deletions
diff --git a/jstests/change_streams/show_resharding_system_events.js b/jstests/change_streams/show_resharding_system_events.js new file mode 100644 index 00000000000..8463102419a --- /dev/null +++ b/jstests/change_streams/show_resharding_system_events.js @@ -0,0 +1,142 @@ +/** + * Tests the behavior of change streams on a system.resharding.* namespace in the presence of + * 'showSystemEvents' flag. This is a separate test from 'show_system_events.js' because it can only + * operate in a sharded cluster. + * + * @tags: [ + * requires_fcv_61, + * featureFlagChangeStreamsVisibility, + * featureFlagChangeStreamsFurtherEnrichedEvents, + * requires_sharding, + * uses_change_streams, + * change_stream_does_not_expect_txns, + * assumes_unsharded_collection, + * assumes_read_preference_unchanged, + * ] + */ +(function() { +"use strict"; + +load('jstests/libs/change_stream_util.js'); // For 'assertChangeStreamEventEq'. + +// Create a single-shard cluster for this test. +const st = new ShardingTest({ + shards: 1, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const testDB = st.s.getDB(jsTestName()); +const testColl = testDB[jsTestName()]; + +// Shard the collection based on '_id'. +st.shardColl(testColl, {_id: 1}, false); + +// Build an index on the collection to support the resharding operation. +assert.commandWorked(testColl.createIndex({a: 1})); + +// Insert some documents that will be resharded. +assert.commandWorked(testColl.insert({_id: 0, a: 0})); +assert.commandWorked(testColl.insert({_id: 1, a: 1})); + +// Helper function to retrieve the UUID of the specified collection. +function getCollectionUuid(coll) { + const collInfo = testDB.getCollectionInfos({name: coll.getName()})[0]; + return collInfo.info.uuid; +} + +// Obtain a resume token indicating the start point for the test. +const startPoint = testDB.watch().getResumeToken(); + +// Get the UUID of the collection before resharding. +const oldUUID = getCollectionUuid(testColl); + +// Reshard the collection. +assert.commandWorked(st.s.adminCommand({ + reshardCollection: testColl.getFullName(), + key: {a: 1}, +})); + +// Get the UUID of the collection after resharding. +const newUUID = getCollectionUuid(testColl); + +// Write one more sentinel document into the collection. +assert.commandWorked(testColl.insert({_id: 2, a: 2})); + +// Now confirm the sequence of events that we expect to see in the change stream. +const reshardingCollName = `system.resharding.${oldUUID.toString().match(/\"([^\"]+)\"/)[1]}`; +const reshardingNs = { + db: testDB.getName(), + coll: reshardingCollName +}; +const origNs = { + db: testDB.getName(), + coll: testColl.getName() +}; +const expectedReshardingEvents = [ + {ns: reshardingNs, collectionUUID: newUUID, operationType: "create"}, + { + ns: reshardingNs, + collectionUUID: newUUID, + operationType: "createIndexes", + operationDescription: {indexes: [{v: 2, key: {a: 1}, name: "a_1"}]} + }, + { + ns: reshardingNs, + collectionUUID: newUUID, + operationType: "shardCollection", + operationDescription: {shardKey: {a: 1}} + }, + { + ns: reshardingNs, + collectionUUID: newUUID, + operationType: "insert", + fullDocument: {_id: 0, a: 0}, + documentKey: {a: 0, _id: 0} + }, + { + ns: reshardingNs, + collectionUUID: newUUID, + operationType: "insert", + fullDocument: {_id: 1, a: 1}, + documentKey: {a: 1, _id: 1} + }, + { + ns: origNs, + collectionUUID: oldUUID, + operationType: "reshardCollection", + operationDescription: + {reshardUUID: newUUID, shardKey: {a: 1}, oldShardKey: {_id: 1}, unique: false} + }, + { + ns: origNs, + collectionUUID: newUUID, + operationType: "insert", + fullDocument: {_id: 2, a: 2}, + documentKey: {a: 2, _id: 2} + }, +]; + +// Helper to confirm the sequence of events observed in the change stream. +function assertChangeStreamEventSequence(csConfig, expectedEvents) { + // Open a change stream on the test DB using the given configuration. + const finalConfig = + Object.assign({resumeAfter: startPoint, showExpandedEvents: true}, csConfig); + const csCursor = testDB.watch([], finalConfig); + + // Confirm that we see the expected sequence of events. + expectedEvents.forEach((expectedEvent) => { + assert.soon(() => csCursor.hasNext()); + assertChangeStreamEventEq(csCursor.next(), expectedEvent); + }); +} + +// With showSystemEvents set to true, we expect to see the full sequence of events. +assertChangeStreamEventSequence({showSystemEvents: true}, expectedReshardingEvents); + +// With showSystemEvents set to false, we expect to only see events on the original namespace. +const nonSystemEvents = + expectedReshardingEvents.filter((event) => (event.ns.coll === testColl.getName())); +assertChangeStreamEventSequence({showSystemEvents: false}, nonSystemEvents); + +st.stop(); +}());
\ No newline at end of file diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 15028c54af2..e5a00bf92c9 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -161,7 +161,7 @@ function isChangeStreamEventEq(actualEvent, expectedEvent) { function assertChangeStreamEventEq(actualEvent, expectedEvent) { assert(isChangeStreamEventEq(actualEvent, expectedEvent), () => "Change events did not match. Expected: " + tojsonMaybeTruncate(expectedEvent) + - ", Actual: " + tojsonMaybeTruncate(testEvent)); + ", Actual: " + tojsonMaybeTruncate(actualEvent)); } function ChangeStreamTest(_db, name = "ChangeStreamTest") { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 1ecdde1d5b6..cb720da2515 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -246,9 +246,10 @@ public: // Default regex for collections match which prohibits system collections. static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd; - // Regex matching all regular collections plus certain system collections. + + // Regex matching all user collections plus collections exposed when 'showSystemEvents' is set. static constexpr StringData kRegexAllCollectionsShowSystemEvents = - R"((?!(\$|system\.(?!(js$)))))"_sd; + R"((?!(\$|system\.(?!(js$|resharding\.)))))"_sd; static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd; static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd; diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 7f496a55383..3379a0d34d1 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -40,6 +40,20 @@ env.Library( ) env.Library( + target='change_stream_oplog_notification', + source=[ + 'change_stream_oplog_notification.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/concurrency/exception_util', + '$BUILD_DIR/mongo/db/concurrency/lock_manager', + '$BUILD_DIR/mongo/db/dbhelpers', + 'oplog', + 'oplog_entry', + ], +) + +env.Library( target='oplog', source=[ 'apply_ops.cpp', diff --git a/src/mongo/db/repl/change_stream_oplog_notification.cpp b/src/mongo/db/repl/change_stream_oplog_notification.cpp new file mode 100644 index 00000000000..531f7af2e57 --- /dev/null +++ b/src/mongo/db/repl/change_stream_oplog_notification.cpp @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/change_stream_oplog_notification.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/logv2/redaction.h" + +namespace mongo { + +void notifyChangeStreamsOnShardCollection(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + BSONObj cmd) { + BSONObjBuilder cmdBuilder; + cmdBuilder.append("shardCollection", nss.ns()); + cmdBuilder.appendElements(cmd); + + BSONObj fullCmd = cmdBuilder.obj(); + + repl::MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(BSON("msg" << BSON("shardCollection" << nss.ns()))); + oplogEntry.setObject2(fullCmd); + oplogEntry.setOpTime(repl::OpTime()); + oplogEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); + + writeConflictRetry( + opCtx, "ShardCollectionWritesOplog", NamespaceString::kRsOplogNamespace.ns(), [&] { + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + WriteUnitOfWork wunit(opCtx); + const auto& oplogOpTime = repl::logOp(opCtx, &oplogEntry); + uassert(8423339, + str::stream() << "Failed to create new oplog entry for oplog with opTime: " + << oplogEntry.getOpTime().toString() << ": " + << redact(oplogEntry.toBSON()), + !oplogOpTime.isNull()); + wunit.commit(); + }); +} +} // namespace mongo diff --git a/src/mongo/db/repl/change_stream_oplog_notification.h b/src/mongo/db/repl/change_stream_oplog_notification.h new file mode 100644 index 00000000000..ee746b96167 --- /dev/null +++ b/src/mongo/db/repl/change_stream_oplog_notification.h @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/util/uuid.h" + +namespace mongo { + +/* + * This function writes a no-op oplog entry on shardCollection event. + * TODO SERVER-66333: move all other notifyChangeStreams* functions here. + */ +void notifyChangeStreamsOnShardCollection(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + BSONObj cmd); +} // namespace mongo diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 07b08105b8b..e0cf8313b31 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -146,6 +146,7 @@ env.Library( '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/pipeline/aggregation_request_helper', '$BUILD_DIR/mongo/db/repl/abstract_async_component', + '$BUILD_DIR/mongo/db/repl/change_stream_oplog_notification', '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/primary_only_service', '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', @@ -458,6 +459,7 @@ env.Library( '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/internal_transactions_feature_flag', '$BUILD_DIR/mongo/db/multitenancy', + '$BUILD_DIR/mongo/db/repl/change_stream_oplog_notification', '$BUILD_DIR/mongo/db/repl/primary_only_service', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/repl/replica_set_messages', diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index cea380bd7c7..9d4806761ee 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -41,6 +41,7 @@ #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/repl/change_stream_oplog_notification.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/create_collection_coordinator.h" #include "mongo/db/s/recoverable_critical_section_service.h" @@ -356,42 +357,6 @@ void broadcastDropCollection(OperationContext* opCtx, opCtx, nss, participants, executor, osi); } -/** - * This function writes a no-op oplog entry on shardCollection event. - */ -void _writeOplogMessage(OperationContext* opCtx, - const NamespaceString& nss, - const UUID& uuid, - const BSONObj cmd) { - BSONObjBuilder cmdBuilder; - cmdBuilder.append("shardCollection", nss.ns()); - cmdBuilder.appendElements(cmd); - - BSONObj fullCmd = cmdBuilder.obj(); - - repl::MutableOplogEntry oplogEntry; - oplogEntry.setOpType(repl::OpTypeEnum::kNoop); - oplogEntry.setNss(nss); - oplogEntry.setUuid(uuid); - oplogEntry.setObject(BSON("msg" << BSON("shardCollection" << nss.ns()))); - oplogEntry.setObject2(fullCmd); - oplogEntry.setOpTime(repl::OpTime()); - oplogEntry.setWallClockTime(opCtx->getServiceContext()->getFastClockSource()->now()); - - writeConflictRetry( - opCtx, "ShardCollectionWritesOplog", NamespaceString::kRsOplogNamespace.ns(), [&] { - AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); - WriteUnitOfWork wunit(opCtx); - const auto& oplogOpTime = repl::logOp(opCtx, &oplogEntry); - uassert(8423339, - str::stream() << "Failed to create new oplog entry for oplog with opTime: " - << oplogEntry.getOpTime().toString() << ": " - << redact(oplogEntry.toBSON()), - !oplogOpTime.isNull()); - wunit.commit(); - }); -} - } // namespace CreateCollectionCoordinator::CreateCollectionCoordinator(ShardingDDLCoordinatorService* service, @@ -904,7 +869,7 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { try { insertCollectionEntry(opCtx, nss(), coll, getCurrentSession(_doc)); - _writeOplogMessage(opCtx, nss(), *_collectionUUID, _request.toBSON()); + notifyChangeStreamsOnShardCollection(opCtx, nss(), *_collectionUUID, _request.toBSON()); LOGV2_DEBUG(5277907, 2, "Collection successfully committed", "namespace"_attr = nss()); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 4713e9d63cd..fc225f6f801 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -41,6 +41,7 @@ #include "mongo/db/ops/delete.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/query/collation/collation_spec.h" +#include "mongo/db/repl/change_stream_oplog_notification.h" #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/wait_for_majority_service.h" @@ -647,6 +648,17 @@ void ReshardingRecipientService::RecipientStateMachine:: true /* enforceUniquenessCheck */, shardkeyutil::ValidationBehaviorsShardCollection(opCtx.get())); }); + + // We add a fake 'shardCollection' notification here so that the C2C replicator can sync the + // resharding operation to the target cluster. The only information we have is the shard + // key, but all other fields must either be default-valued or are ignored by C2C. + // TODO SERVER-66671: The 'createCollRequest' should include the full contents of the + // CreateCollectionRequest rather than just the 'shardKey' field. + const auto createCollRequest = BSON("shardKey" << _metadata.getReshardingKey().toBSON()); + notifyChangeStreamsOnShardCollection(opCtx.get(), + _metadata.getTempReshardingNss(), + _metadata.getReshardingUUID(), + createCollRequest); } _transitionToCloning(factory); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index b8516da1e18..50d635016f7 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -240,6 +240,10 @@ public: return ReshardingMetrics::get(serviceContext); } + BSONObj newShardKeyPattern() { + return BSON("newKey" << 1); + } + ReshardingRecipientDocument makeStateDocument(bool isAlsoDonor) { RecipientShardContext recipientCtx; recipientCtx.setState(RecipientStateEnum::kAwaitingFetchTimestamp); @@ -257,7 +261,7 @@ public: sourceNss, sourceUUID, constructTemporaryReshardingNss(sourceNss.db(), sourceUUID), - BSON("newKey" << 1)); + newShardKeyPattern()); doc.setCommonReshardingMetadata(std::move(commonMetadata)); return doc; @@ -635,7 +639,8 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryOnReshardDoneCatchUp) NamespaceString sourceNss = constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID()); FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace}; - findRequest.setFilter(BSON("ns" << sourceNss.toString())); + findRequest.setFilter( + BSON("ns" << sourceNss.toString() << "o2.reshardDoneCatchUp" << BSON("$exists" << true))); auto cursor = client.find(std::move(findRequest)); ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection"; @@ -656,6 +661,51 @@ TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryOnReshardDoneCatchUp) ASSERT_FALSE(bool(op.getDestinedRecipient())) << op.getEntry(); } +TEST_F(ReshardingRecipientServiceTest, WritesNoopOplogEntryForImplicitShardCollection) { + boost::optional<PauseDuringStateTransitions> doneTransitionGuard; + doneTransitionGuard.emplace(controller(), RecipientStateEnum::kDone); + + auto doc = makeStateDocument(false /* isAlsoDonor */); + auto opCtx = makeOperationContext(); + auto rawOpCtx = opCtx.get(); + RecipientStateMachine::insertStateDocument(rawOpCtx, doc); + auto recipient = RecipientStateMachine::getOrCreate(rawOpCtx, _service, doc.toBSON()); + + notifyToStartCloning(rawOpCtx, *recipient, doc); + notifyReshardingCommitting(opCtx.get(), *recipient, doc); + + doneTransitionGuard->wait(RecipientStateEnum::kDone); + + stepDown(); + doneTransitionGuard.reset(); + ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); + + DBDirectClient client(opCtx.get()); + NamespaceString sourceNss = constructTemporaryReshardingNss("sourcedb", doc.getSourceUUID()); + + FindCommandRequest findRequest{NamespaceString::kRsOplogNamespace}; + findRequest.setFilter( + BSON("ns" << sourceNss.toString() << "o2.shardCollection" << BSON("$exists" << true))); + auto cursor = client.find(std::move(findRequest)); + + ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection"; + repl::OplogEntry shardCollectionOp(cursor->next()); + + ASSERT_EQ(OpType_serializer(shardCollectionOp.getOpType()), + OpType_serializer(repl::OpTypeEnum::kNoop)) + << shardCollectionOp.getEntry(); + ASSERT_EQ(*shardCollectionOp.getUuid(), doc.getReshardingUUID()) + << shardCollectionOp.getEntry(); + ASSERT_EQ(shardCollectionOp.getObject()["msg"].type(), BSONType::Object) + << shardCollectionOp.getEntry(); + ASSERT_FALSE(shardCollectionOp.getFromMigrate()); + + auto shardCollEventExpected = + BSON("shardCollection" << sourceNss.toString() << "shardKey" << newShardKeyPattern()); + ASSERT_BSONOBJ_EQ(*shardCollectionOp.getObject2(), shardCollEventExpected); +} + TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) { auto metrics = ReshardingRecipientServiceTest::metrics(); |