diff options
author | Misha Tyulenev <misha.tyulenev@mongodb.com> | 2022-03-16 21:52:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-16 22:55:45 +0000 |
commit | cccb587590252e1b2bf2e199e783e52c65141825 (patch) | |
tree | 89a819d7e9f541b4ce45ac9cc36f0b6f53e68821 | |
parent | 75c7f1ed4f99d3fa3bfbf942cf086b68c9a47286 (diff) | |
download | mongo-cccb587590252e1b2bf2e199e783e52c65141825.tar.gz |
SERVER-62591 Add change streams event for migrate last chunk off shard
-rw-r--r-- | jstests/change_streams/migrate_last_chunk_from_shard_event.js | 156 | ||||
-rw-r--r-- | src/mongo/db/pipeline/change_stream_filter_helpers.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.h | 19 |
8 files changed, 280 insertions, 46 deletions
diff --git a/jstests/change_streams/migrate_last_chunk_from_shard_event.js b/jstests/change_streams/migrate_last_chunk_from_shard_event.js new file mode 100644 index 00000000000..8ae2c423eaa --- /dev/null +++ b/jstests/change_streams/migrate_last_chunk_from_shard_event.js @@ -0,0 +1,156 @@ +/** + * Test that change streams returns migrateLastChunkFromShard events. + * + * @tags: [ + * requires_fcv_60, + * requires_sharding, + * uses_change_streams, + * change_stream_does_not_expect_txns, + * assumes_unsharded_collection, + * assumes_read_preference_unchanged, + * featureFlagChangeStreamsVisibility + * ] + */ + +(function() { +"use strict"; + +load("jstests/libs/collection_drop_recreate.js"); // For assertDropCollection. +load('jstests/libs/change_stream_util.js'); // For 'ChangeStreamTest' and + // 'assertChangeStreamEventEq'. +const dbName = jsTestName(); +const collName = "test"; +const collNS = dbName + "." + collName; +const ns = { + db: dbName, + coll: collName +}; +const numDocs = 1; + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}} +}); + +const mongosConn = st.s; +const db = mongosConn.getDB(dbName); +const test = new ChangeStreamTest(db); + +function getCollectionUuid(coll) { + const collInfo = db.getCollectionInfos({name: coll})[0]; + return collInfo.info.uuid; +} + +function assertMigrateEventObserved(cursor, expectedEvent) { + let events = test.getNextChanges(cursor, 1); + let event = events[0]; + // Check the presence and the type of 'wallTime' field. We have no way to check the correctness + // of 'wallTime' value, so we delete it afterwards. + assert(event.wallTime instanceof Date); + delete event.wallTime; + expectedEvent.collectionUUID = getCollectionUuid(collName); + assertChangeStreamEventEq(event, expectedEvent); + return event._id; +} + +function prepareCollection() { + assertDropCollection(db, collName); + assert.commandWorked(db[collName].insert({_id: 1})); + assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {_id: 1}})); +} + +// Test that if showSystemEvents is false, we do not see the migrateLastChunkFromShard event. +function validateShowSystemEventsFalse() { + prepareCollection(); + let pipeline = [ + {$changeStream: {showExpandedEvents: true, showSystemEvents: false}}, + {$match: {operationType: {$nin: ["create", "createIndexes"]}}} + ]; + let cursor = test.startWatchingChanges( + {pipeline, collection: collName, aggregateOptions: {cursor: {batchSize: 0}}}); + + // Migrate a chunk, then insert a new document. + assert.commandWorked( + db.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName})); + assert.commandWorked(db[collName].insert({_id: numDocs + 1})); + + // Confirm that we don't observe the migrateLastChunkFromShard event in the stream, but only see + // the subsequent insert. + test.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: { + operationType: "insert", + ns: ns, + fullDocument: {_id: numDocs + 1}, + documentKey: {_id: numDocs + 1}, + } + }); +} + +// Test that if showSystemEvents is true, we see the migrateLastChunkFromShard event and can resume +// after it. +function validateExpectedEventAndConfirmResumability(collParam, expectedOutput) { + prepareCollection(); + + let pipeline = [ + {$changeStream: {showExpandedEvents: true, showSystemEvents: true}}, + {$match: {operationType: {$nin: ["create", "createIndexes"]}}} + ]; + let cursor = test.startWatchingChanges( + {pipeline: pipeline, collection: collParam, aggregateOptions: {cursor: {batchSize: 0}}}); + + // Migrate a chunk from one shard to another. + assert.commandWorked( + db.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName})); + + // Confirm that we observe the migrateLastChunkFromShard event, and obtain its resume token. + const migrateResumeToken = assertMigrateEventObserved(cursor, expectedOutput); + + // Insert a document before starting the next change stream so that we can validate the + // resuming behavior. + assert.commandWorked(db[collName].insert({_id: numDocs + 1})); + + // Resume after the migrate event and confirm we see the subsequent insert. + pipeline = [{ + $changeStream: + {showExpandedEvents: true, showSystemEvents: true, resumeAfter: migrateResumeToken} + }]; + cursor = test.startWatchingChanges({pipeline: pipeline, collection: collParam}); + + test.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: { + operationType: "insert", + ns: ns, + fullDocument: {_id: numDocs + 1}, + documentKey: {_id: numDocs + 1}, + } + }); +} + +assert.commandWorked(db.adminCommand({enableSharding: dbName})); +assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName})); + +// Test the behaviour of migrateLastChunkFromShard for a single-collection stream +validateExpectedEventAndConfirmResumability(collName, { + operationType: "migrateLastChunkFromShard", + ns: ns, + operationDescription: { + "shardId": st.shard0.shardName, + } +}); + +// Test the behaviour of migrateLastChunkFromShard for a whole-DB stream. +validateExpectedEventAndConfirmResumability(1, { + operationType: "migrateLastChunkFromShard", + ns: ns, + operationDescription: { + "shardId": st.shard0.shardName, + } +}); + +// Test the behaviour of migrateLastChunkFromShard when showSystemEvents is false. +validateShowSystemEventsFalse(); + +st.stop(); +}()); diff --git a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp index e0016fa00ad..377c2bf2eb2 100644 --- a/src/mongo/db/pipeline/change_stream_filter_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_filter_helpers.cpp @@ -256,6 +256,11 @@ std::unique_ptr<MatchExpression> buildInternalOpFilter( auto nsRegex = DocumentSourceChangeStream::getNsRegexForChangeStream(expCtx); internalOpTypeOrBuilder.append(BSON("o2.shardCollection" << BSONRegEx(nsRegex))); + // Only return the 'migrateLastChunkFromShard' event if 'showSystemEvents' is set. + if (expCtx->changeStreamSpec->getShowSystemEvents()) { + internalOpTypeOrBuilder.append(BSON("o2.migrateLastChunkFromShard" << BSONRegEx(nsRegex))); + } + // Finalize the array of $or filter predicates. internalOpTypeOrBuilder.done(); diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index ae281596679..05e6d623dcf 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -234,6 +234,7 @@ public: static constexpr StringData kCreateIndexesOpType = "createIndexes"_sd; static constexpr StringData kDropIndexesOpType = "dropIndexes"_sd; static constexpr StringData kShardCollectionOpType = "shardCollection"_sd; + static constexpr StringData kMigrateLastChunkFromShardOpType = "migrateLastChunkFromShard"_sd; // Default regex for collections match which prohibits system collections. static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 17f35589a54..68893c56738 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -377,6 +377,15 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document break; } + // Check if this is a migration of the last chunk off a shard. + if (!input.getNestedField("o2.migrateLastChunkFromShard").missing()) { + const auto o2Field = input[repl::OplogEntry::kObject2FieldName].getDocument(); + operationType = DocumentSourceChangeStream::kMigrateLastChunkFromShardOpType; + operationDescription = + Value(copyDocExceptFields(o2Field, {"migrateLastChunkFromShard"_sd})); + break; + } + // Otherwise, o2.type determines the message type. auto o2Type = input.getNestedField("o2.type"); tassert(5052200, "o2.type is missing from noop oplog event", !o2Type.missing()); diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 84598e4e1a2..180f75b78ea 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -390,13 +390,20 @@ void MigrationSourceManager::enterCriticalSection() { _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis()); _cloneAndCommitTimer.reset(); - _notifyChangeStreamsOnRecipientFirstChunk(_getCurrentMetadataAndCheckEpoch()); + const auto& metadata = _getCurrentMetadataAndCheckEpoch(); + + // Check that there are no chunks on the recepient shard. Write an oplog event for change + // streams if this is the first migration to the recipient. + if (!metadata.getChunkManager()->getVersion(_args.getToShardId()).isSet()) { + migrationutil::notifyChangeStreamsOnRecipientFirstChunk( + _opCtx, _args.getNss(), _args.getFromShardId(), _args.getToShardId(), _collectionUUID); + } // Mark the shard as running critical operation, which requires recovery on crash. // // NOTE: The 'migrateChunkToNewShard' oplog message written by the above call to - // '_notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its local - // write to majority committed. + // 'notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its + // local write to majority committed. uassertStatusOK(ShardingStateRecovery::startMetadataOp(_opCtx)); LOGV2_DEBUG_OPTIONS(4817402, @@ -571,6 +578,13 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { // Migration succeeded const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(); + // Check if there are no chunks left on donor shard. Write an oplog event for change streams if + // the last chunk migrated off the donor. + if (!refreshedMetadata.getChunkManager()->getVersion(_args.getFromShardId()).isSet()) { + migrationutil::notifyChangeStreamsOnDonorLastChunk( + _opCtx, _args.getNss(), _args.getFromShardId(), _collectionUUID); + } + LOGV2(22018, "Migration succeeded and updated collection version to {updatedCollectionVersion}", @@ -691,42 +705,6 @@ CollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() { return metadata; } -void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( - const CollectionMetadata& metadata) { - // If this is not the first donation, there is nothing to be done - if (metadata.getChunkManager()->getVersion(_args.getToShardId()).isSet()) - return; - - const std::string dbgMessage = str::stream() - << "Migrating chunk from shard " << _args.getFromShardId() << " to shard " - << _args.getToShardId() << " with no chunks for this collection"; - - // The message expected by change streams - const auto o2Message = BSON("type" - << "migrateChunkToNewShard" - << "from" << _args.getFromShardId() << "to" - << _args.getToShardId()); - - auto const serviceContext = _opCtx->getClient()->getServiceContext(); - - UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - AutoGetOplog oplogWrite(_opCtx, OplogAccessMode::kWrite); - writeConflictRetry( - _opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] { - WriteUnitOfWork uow(_opCtx); - serviceContext->getOpObserver()->onInternalOpMessage(_opCtx, - _args.getNss(), - *_collectionUUID, - BSON("msg" << dbgMessage), - o2Message, - boost::none, - boost::none, - boost::none, - boost::none); - uow.commit(); - }); -} - void MigrationSourceManager::_cleanup(bool completeMigration) noexcept { invariant(_state != kDone); diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index d51d6265867..9b1b64dd87f 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -187,13 +187,6 @@ private: CollectionMetadata _getCurrentMetadataAndCheckEpoch(); /** - * If this donation moves the first chunk to the recipient (i.e., the recipient didn't have any - * chunks), this function writes a no-op message to the oplog, so that change stream will notice - * that and close the cursor in order to notify mongos to target the new shard as well. - */ - void _notifyChangeStreamsOnRecipientFirstChunk(const CollectionMetadata& metadata); - - /** * Called when any of the states fails. May only be called once and will put the migration * manager into the kDone state. */ diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 364f5813f0a..5bde34b3591 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -42,9 +42,11 @@ #include "mongo/db/catalog/collection_catalog_helper.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/commands.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/logical_session_cache.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/op_observer.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator.h" @@ -706,6 +708,73 @@ int retrieveNumOrphansFromRecipient(OperationContext* opCtx, return rangeDeletionResponse.docs[0].getIntField("numOrphanDocs"); } +void notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx, + const NamespaceString& collNss, + const ShardId& fromShardId, + const ShardId& toShardId, + boost::optional<UUID> collUUID) { + + const std::string dbgMessage = str::stream() + << "Migrating chunk from shard " << fromShardId << " to shard " << toShardId + << " with no chunks for this collection"; + + // The message expected by change streams + const auto o2Message = BSON("type" + << "migrateChunkToNewShard" + << "from" << fromShardId << "to" << toShardId); + + auto const serviceContext = opCtx->getClient()->getServiceContext(); + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + writeConflictRetry( + opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] { + WriteUnitOfWork uow(opCtx); + serviceContext->getOpObserver()->onInternalOpMessage(opCtx, + collNss, + *collUUID, + BSON("msg" << dbgMessage), + o2Message, + boost::none, + boost::none, + boost::none, + boost::none); + uow.commit(); + }); +} + +void notifyChangeStreamsOnDonorLastChunk(OperationContext* opCtx, + const NamespaceString& collNss, + const ShardId& donorShardId, + boost::optional<UUID> collUUID) { + + const std::string oMessage = str::stream() + << "Migrate the last chunk for " << collNss << " off shard " << donorShardId; + + // The message expected by change streams + const auto o2Message = + BSON("migrateLastChunkFromShard" << collNss.toString() << "shardId" << donorShardId); + + auto const serviceContext = opCtx->getClient()->getServiceContext(); + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + writeConflictRetry( + opCtx, "migrateLastChunkFromShard", NamespaceString::kRsOplogNamespace.ns(), [&] { + WriteUnitOfWork uow(opCtx); + serviceContext->getOpObserver()->onInternalOpMessage(opCtx, + collNss, + *collUUID, + BSON("msg" << oMessage), + o2Message, + boost::none, + boost::none, + boost::none, + boost::none); + uow.commit(); + }); +} + void persistCommitDecision(OperationContext* opCtx, const MigrationCoordinatorDocument& migrationDoc) { invariant(migrationDoc.getDecision() && @@ -1068,6 +1137,10 @@ void recoverMigrationCoordinations(OperationContext* opCtx, coordinator.setMigrationDecision(DecisionEnum::kAborted); } else { coordinator.setMigrationDecision(DecisionEnum::kCommitted); + if (!currentMetadata.getChunkManager()->getVersion(doc.getDonorShardId()).isSet()) { + notifyChangeStreamsOnDonorLastChunk( + opCtx, doc.getNss(), doc.getDonorShardId(), doc.getCollectionUuid()); + } } coordinator.completeMigration(opCtx, acquireCSOnRecipient); diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index 73012403600..b5af6673729 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -282,5 +282,24 @@ void drainMigrationsPendingRecovery(OperationContext* opCtx); void asyncRecoverMigrationUntilSuccessOrStepDown(OperationContext* opCtx, const NamespaceString& nss) noexcept; +/** + * This function writes a no-op message to the oplog when migrating a first chunk to the recipient + * (i.e., the recipient didn't have any * chunks), so that change stream will notice that and close + * the cursor in order to notify mongos to target the new shard as well. + */ +void notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx, + const NamespaceString& collNss, + const ShardId& fromShardId, + const ShardId& toShardId, + boost::optional<UUID> collUUID); + +/** + * This function writes a no-op message to the oplog when during migration the last chunk of the + * collection collNss is migrated off the off the donor and hence the donor has no more chunks. + */ +void notifyChangeStreamsOnDonorLastChunk(OperationContext* opCtx, + const NamespaceString& collNss, + const ShardId& donorShardId, + boost::optional<UUID> collUUID); } // namespace migrationutil } // namespace mongo |