diff options
54 files changed, 1042 insertions, 660 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml index fd05367ac35..ec41092246c 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml @@ -7,10 +7,6 @@ selector: # This test exercises an internal detail of mongos<->mongod communication and is not expected to # work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data. - - jstests/change_streams/change_stream_shell_helper.js - - jstests/change_streams/include_cluster_time.js - - jstests/change_streams/lookup_post_image.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml index 17992685444..5664164c149 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -6,9 +6,11 @@ selector: exclude_files: # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # Renames a collection, which is not allowed for sharded collections. - - jstests/change_streams/change_stream_rename_target.js - - jstests/change_streams/change_stream_rename_resumability.js + # TODO SERVER-35280: Update change stream tests to handle drops of sharded collections and databases. + - jstests/change_streams/whole_cluster.js + - jstests/change_streams/whole_cluster_metadata_notifications.js + - jstests/change_streams/whole_db.js + - jstests/change_streams/whole_db_metadata_notifications.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml index 086f96021c3..740ad95685a 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml @@ -8,9 +8,12 @@ selector: # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data. - - jstests/change_streams/change_stream_shell_helper.js + - jstests/change_streams/shell_helper.js - jstests/change_streams/lookup_post_image.js - jstests/change_streams/include_cluster_time.js + # Not relevant for whole-cluster change streams. + - jstests/change_streams/metadata_notifications.js + - jstests/change_streams/whole_db_metadata_notifications.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml index 0ed49f26ae2..b5d3e8688af 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml @@ -3,6 +3,10 @@ test_kind: js_test selector: roots: - jstests/change_streams/**/*.js + exclude_files: + # Not relevant for whole-cluster change streams. + - jstests/change_streams/metadata_notifications.js + - jstests/change_streams/whole_db_metadata_notifications.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml index 651fad01bed..78fc224c071 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml @@ -9,6 +9,9 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # Not relevant for whole-cluster change streams. + - jstests/change_streams/metadata_notifications.js + - jstests/change_streams/whole_db_metadata_notifications.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml index c6c10ecb937..a60da2f3d0e 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml @@ -6,9 +6,13 @@ selector: exclude_files: # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # These tests rename a collection, which is not allowed for sharded collections. - - jstests/change_streams/change_stream_rename_target.js - - jstests/change_streams/change_stream_rename_resumability.js + # Not relevant for whole-cluster change streams. + - jstests/change_streams/metadata_notifications.js + - jstests/change_streams/whole_db_metadata_notifications.js + # TODO SERVER-35280: Update change stream tests to handle drops of sharded collections and databases. + - jstests/change_streams/whole_cluster.js + - jstests/change_streams/whole_cluster_metadata_notifications.js + - jstests/change_streams/whole_db.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml index d2452f5c739..0c08e630f4c 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml @@ -8,9 +8,11 @@ selector: # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data. - - jstests/change_streams/change_stream_shell_helper.js + - jstests/change_streams/shell_helper.js - jstests/change_streams/lookup_post_image.js - jstests/change_streams/include_cluster_time.js + # Only relevant for single-collection change streams. + - jstests/change_streams/metadata_notifications.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml index 42233a8d81b..639179c90b9 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml @@ -3,6 +3,9 @@ test_kind: js_test selector: roots: - jstests/change_streams/**/*.js + exclude_files: + # Only relevant for single-collection change streams. + - jstests/change_streams/metadata_notifications.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml index 96332cfe463..838dd2ffce7 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml @@ -9,6 +9,9 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # Only relevant for single-collection change streams. + - jstests/change_streams/metadata_notifications.js + exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml index d5b1d538c71..06a30be4ee6 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml @@ -6,9 +6,13 @@ selector: exclude_files: # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js - # These tests rename a collection, which is not allowed for sharded collections. - - jstests/change_streams/change_stream_rename_target.js - - jstests/change_streams/change_stream_rename_resumability.js + # Only relevant for single-collection change streams. + - jstests/change_streams/metadata_notifications.js + # TODO SERVER-35280: Update change stream tests to handle drops of sharded collections and databases. + - jstests/change_streams/whole_cluster.js + - jstests/change_streams/whole_cluster_metadata_notifications.js + - jstests/change_streams/whole_db.js + - jstests/change_streams/whole_db_metadata_notifications.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/sharding_11.yml b/buildscripts/resmokeconfig/suites/sharding_11.yml index f13cca6d959..e49a191d72c 100644 --- a/buildscripts/resmokeconfig/suites/sharding_11.yml +++ b/buildscripts/resmokeconfig/suites/sharding_11.yml @@ -36,7 +36,7 @@ selector: - jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js - jstests/sharding/sharded_limit_batchsize.js - jstests/sharding/migration_sets_fromMigrate_flag.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/bulk_insert.js - jstests/sharding/multi_mongos2a.js - jstests/sharding/movechunk_commit_changelog_stats.js diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_15.yml b/buildscripts/resmokeconfig/suites/sharding_auth_15.yml index 257edd3ebdd..3053ef9e52a 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_15.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_15.yml @@ -24,7 +24,7 @@ selector: - jstests/sharding/change_streams_establishment_finds_new_shards.js - jstests/sharding/inserts_consistent.js - jstests/sharding/not_allowed_on_sharded_collection_cmd.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/mapReduce_outSharded.js - jstests/sharding/autosplit.js - jstests/sharding/accurate_count_with_predicate.js diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml index 0bfc47532e5..fc092eb55f2 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml @@ -35,7 +35,7 @@ selector: - jstests/sharding/change_streams_establishment_finds_new_shards.js - jstests/sharding/inserts_consistent.js - jstests/sharding/not_allowed_on_sharded_collection_cmd.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/autosplit.js - jstests/sharding/tag_auto_split.js - jstests/sharding/covered_shard_key_indexes.js diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml index afa10efa647..18a72f302d7 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml @@ -247,7 +247,7 @@ selector: - jstests/sharding/change_streams_establishment_finds_new_shards.js - jstests/sharding/inserts_consistent.js - jstests/sharding/not_allowed_on_sharded_collection_cmd.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/autosplit.js - jstests/sharding/tag_auto_split.js - jstests/sharding/covered_shard_key_indexes.js diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_misc.yml b/buildscripts/resmokeconfig/suites/sharding_auth_misc.yml index c1e591d81d1..cec1ad5dfb9 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_misc.yml @@ -245,7 +245,7 @@ selector: - jstests/sharding/change_streams_establishment_finds_new_shards.js - jstests/sharding/inserts_consistent.js - jstests/sharding/not_allowed_on_sharded_collection_cmd.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/mapReduce_outSharded.js - jstests/sharding/autosplit.js - jstests/sharding/accurate_count_with_predicate.js diff --git a/buildscripts/resmokeconfig/suites/sharding_ese_16.yml b/buildscripts/resmokeconfig/suites/sharding_ese_16.yml index f43990f5304..1d07c54874c 100644 --- a/buildscripts/resmokeconfig/suites/sharding_ese_16.yml +++ b/buildscripts/resmokeconfig/suites/sharding_ese_16.yml @@ -27,7 +27,7 @@ selector: - jstests/sharding/accurate_count_with_predicate.js - jstests/sharding/migration_sets_fromMigrate_flag.js - jstests/sharding/remove3.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/autosplit.js - jstests/sharding/basic_merge.js - jstests/sharding/merge_chunks_compound_shard_key.js diff --git a/buildscripts/resmokeconfig/suites/sharding_ese_misc.yml b/buildscripts/resmokeconfig/suites/sharding_ese_misc.yml index f8bcc258b22..1005ad7ff90 100644 --- a/buildscripts/resmokeconfig/suites/sharding_ese_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_ese_misc.yml @@ -242,7 +242,7 @@ selector: - jstests/sharding/accurate_count_with_predicate.js - jstests/sharding/migration_sets_fromMigrate_flag.js - jstests/sharding/remove3.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/autosplit.js - jstests/sharding/basic_merge.js - jstests/sharding/merge_chunks_compound_shard_key.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index bc1bf39b26a..467108eaace 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -60,8 +60,8 @@ selector: # New 4.0 feature - jstests/sharding/snapshot_aggregate_mongos.js - jstests/sharding/snapshot_find_mongos.js - - jstests/sharding/change_stream_invalidation.js - jstests/sharding/change_stream_lookup_single_shard_cluster.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/change_streams.js - jstests/sharding/change_streams_primary_shard_unaware.js - jstests/sharding/change_streams_unsharded_becomes_sharded.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml index 0471b1321b6..d9732db7a6b 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml @@ -64,4 +64,4 @@ executor: mongosBinVersion: 'last-stable' shardMixedBinVersions: true skipCheckingUUIDsConsistentAcrossCluster: true - nodb: ''
\ No newline at end of file + nodb: '' diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index bccc925cf99..b17f585db15 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -68,8 +68,8 @@ selector: # New 4.0 feature - jstests/sharding/snapshot_aggregate_mongos.js - jstests/sharding/snapshot_find_mongos.js - - jstests/sharding/change_stream_invalidation.js - jstests/sharding/change_stream_lookup_single_shard_cluster.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/change_streams.js - jstests/sharding/change_streams_primary_shard_unaware.js - jstests/sharding/change_streams_unsharded_becomes_sharded.js @@ -235,7 +235,7 @@ selector: - jstests/sharding/addshard5.js - jstests/sharding/count2.js - jstests/sharding/features1.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/regex_targeting.js - jstests/sharding/shard6.js - jstests/sharding/query_config.js diff --git a/buildscripts/resmokeconfig/suites/sharding_misc.yml b/buildscripts/resmokeconfig/suites/sharding_misc.yml index 55ee8cba885..3a4374bdbd0 100644 --- a/buildscripts/resmokeconfig/suites/sharding_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_misc.yml @@ -216,7 +216,7 @@ selector: - jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js - jstests/sharding/sharded_limit_batchsize.js - jstests/sharding/migration_sets_fromMigrate_flag.js - - jstests/sharding/change_stream_invalidation.js + - jstests/sharding/change_stream_metadata_notifications.js - jstests/sharding/bulk_insert.js - jstests/sharding/multi_mongos2a.js - jstests/sharding/movechunk_commit_changelog_stats.js diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/apply_ops.js index 0f954578e52..ee116d3495d 100644 --- a/jstests/change_streams/change_stream_apply_ops.js +++ b/jstests/change_streams/apply_ops.js @@ -20,8 +20,12 @@ coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"}); let cst = new ChangeStreamTest(db); - let changeStream = cst.startWatchingChanges( - {pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], collection: coll}); + let changeStream = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], + collection: coll, + doNotModifyInPassthroughs: + true // A collection drop only invalidates single-collection change streams. + }); const sessionOptions = {causalConsistency: false}; const session = db.getMongo().startSession(sessionOptions); @@ -93,12 +97,21 @@ lsid: session.getSessionId(), txnNumber: NumberLong(session._txnNumber), }, - {operationType: "invalidate"}, + { + operationType: "drop", + ns: {db: db.getName(), coll: coll.getName()}, + }, ]; // Verify that the stream returns the expected sequence of changes. - const changes = cst.assertNextChangesEqual( - {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true}); + const changes = + cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); + // Single collection change stream should also be invalidated by the drop. + cst.assertNextChangesEqual({ + cursor: changeStream, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); // Obtain the clusterTime from the first change. const startTime = changes[0].clusterTime; @@ -119,8 +132,7 @@ pipeline: [{$changeStream: {startAtOperationTime: startTime}}, {$project: {"lsid.uid": 0}}], collection: 1 }); - cst.assertNextChangesEqual( - {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true}); + cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); // Add an entry for the insert on otherDb.otherDbColl into expectedChanges. expectedChanges.splice(3, 0, { @@ -142,8 +154,7 @@ ], collection: 1 }); - cst.assertNextChangesEqual( - {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true}); + cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); cst.cleanUp(); }()); diff --git a/jstests/change_streams/change_stream_apply_ops_resumability.js b/jstests/change_streams/apply_ops_resumability.js index 8d7ee90b8e6..a8d25d775d4 100644 --- a/jstests/change_streams/change_stream_apply_ops_resumability.js +++ b/jstests/change_streams/apply_ops_resumability.js @@ -88,6 +88,10 @@ }, ]; + // + // Test behavior of single-collection change streams with apply ops. + // + // Verify that the stream returns the expected sequence of changes. const changes = cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges}); @@ -115,13 +119,28 @@ let otherCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}], - collection: coll + collection: coll, + doNotModifyInPassthroughs: true // A collection drop only invalidates single-collection + // change streams. }); cst.assertNextChangesEqual({cursor: otherCursor, expectedChanges: expectedChanges.slice(3)}); - // Drop the collection. This will trigger an "invalidate" event at the end of the stream. + // Drop the collection. This will trigger a "drop" followed by an "invalidate" for the single + // collection change stream. assert.commandWorked(db.runCommand({drop: coll.getName()})); - expectedChanges.push({operationType: "invalidate"}); + let change = cst.getOneChange(otherCursor); + assert.eq(change.operationType, "drop"); + assert.eq(change.ns, {db: db.getName(), coll: coll.getName()}); + change = cst.getOneChange(otherCursor, true); + assert.eq(change.operationType, "invalidate"); + + // + // Test behavior of whole-db change streams with apply ops. + // + + // For a whole-db or whole-cluster change stream, the collection drop should return a single + // "drop" entry and not invalidate the stream. + expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}); // Add an entry for the insert on db.otherColl into expectedChanges. expectedChanges.splice(3, 0, { @@ -139,10 +158,9 @@ changeStream = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}], - collection: 1 + collection: 1, }); - cst.assertNextChangesEqual( - {cursor: changeStream, expectedChanges: expectedChanges.slice(3), expectInvalidate: true}); + cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(3)}); // Add an entry for the insert on otherDb.otherDbColl into expectedChanges. expectedChanges.splice(4, 0, { @@ -154,9 +172,9 @@ txnNumber: NumberLong(session._txnNumber), }); - // Verify that a whole-db stream can be resumed from the middle of the transaction, and that it - // will see all subsequent changes including the insert on the other collection and the changes - // on the other DB. + // Verify that a whole-cluster stream can be resumed from the middle of the transaction, and + // that it will see all subsequent changes including the insert on the other collection and the + // changes on the other DB. cst = new ChangeStreamTest(db.getSiblingDB("admin")); changeStream = cst.startWatchingChanges({ pipeline: [ @@ -165,8 +183,7 @@ ], collection: 1 }); - cst.assertNextChangesEqual( - {cursor: changeStream, expectedChanges: expectedChanges.slice(3), expectInvalidate: true}); + cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(3)}); cst.cleanUp(); }()); diff --git a/jstests/change_streams/change_stream_ban_from_lookup.js b/jstests/change_streams/ban_from_lookup.js index 08bc600ad55..08bc600ad55 100644 --- a/jstests/change_streams/change_stream_ban_from_lookup.js +++ b/jstests/change_streams/ban_from_lookup.js diff --git a/jstests/change_streams/change_stream_ban_from_views.js b/jstests/change_streams/ban_from_views.js index c06932e55b3..c06932e55b3 100644 --- a/jstests/change_streams/change_stream_ban_from_views.js +++ b/jstests/change_streams/ban_from_views.js diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 0d392b99bcc..7c5688b0704 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -160,17 +160,6 @@ assertDropCollection(db, db.dropping.getName()); // Should still see the previous change from t2, shouldn't see anything about 'dropping'. - // Test collection renaming. Sharded collections cannot be renamed. - if (!FixtureHelpers.isSharded(db.t2)) { - jsTestLog("Testing rename"); - assertDropCollection(db, "t3"); - t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); - assert.writeOK(db.t2.renameCollection("t3")); - expected = {operationType: "invalidate"}; - cst.assertNextChangesEqual( - {cursor: t2cursor, expectedChanges: [expected], expectInvalidate: true}); - } - jsTestLog("Testing insert that looks like rename"); assertDropCollection(db, "dne1"); assertDropCollection(db, "dne2"); diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js deleted file mode 100644 index e955af3af87..00000000000 --- a/jstests/change_streams/change_stream_invalidation.js +++ /dev/null @@ -1,128 +0,0 @@ -// Tests of $changeStream invalidate entries. - -(function() { - "use strict"; - - load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - - db.getMongo().forceReadMode('commands'); - - // Write a document to the collection and test that the change stream returns it - // and getMore command closes the cursor afterwards. - const collGetMore = assertDropAndRecreateCollection(db, "change_stream_getmore_invalidations"); - // We awaited the replication of the first write, so the change stream shouldn't return it. - assert.writeOK(collGetMore.insert({_id: 0, a: 1})); - - let changeStream = collGetMore.watch(); - - // Drop the collection and test that we return "invalidate" entry and close the cursor. However, - // we return all oplog entries preceding the drop. - jsTestLog("Testing getMore command closes cursor for invalidate entries"); - // Create oplog entries of type insert, update, and delete. - assert.writeOK(collGetMore.insert({_id: 1})); - assert.writeOK(collGetMore.update({_id: 1}, {$set: {a: 1}})); - assert.writeOK(collGetMore.remove({_id: 1})); - // Drop the collection. - assert.commandWorked(db.runCommand({drop: collGetMore.getName()})); - - // We should get 4 oplog entries of type insert, update, delete, and invalidate. The cursor - // should be closed. - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "insert"); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "update"); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "delete"); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - assert(changeStream.isExhausted()); - - jsTestLog("Testing aggregate command closes cursor for invalidate entries"); - const collAgg = assertDropAndRecreateCollection(db, "change_stream_agg_invalidations"); - - // Get a valid resume token that the next aggregate command can use. - changeStream = collAgg.watch(); - - assert.writeOK(collAgg.insert({_id: 1})); - - assert.soon(() => changeStream.hasNext()); - let change = changeStream.next(); - assert.eq(change.operationType, "insert"); - assert.eq(change.documentKey, {_id: 1}); - const resumeToken = change._id; - - // Insert another document after storing the resume token. - assert.writeOK(collAgg.insert({_id: 2})); - - assert.soon(() => changeStream.hasNext()); - change = changeStream.next(); - assert.eq(change.operationType, "insert"); - assert.eq(change.documentKey, {_id: 2}); - - // Drop the collection and invalidate the change stream. - assertDropCollection(db, collAgg.getName()); - // Wait for two-phase drop to complete, so that the UUID no longer exists. - assert.soon(function() { - return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collAgg.getName()); - }); - - // Resume the change stream after the collection drop, up to and including the invalidate. This - // is allowed if an explicit collation is provided. - changeStream = collAgg.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}}); - - assert.soon(() => changeStream.hasNext()); - change = changeStream.next(); - assert.eq(change.operationType, "insert"); - assert.eq(change.documentKey, {_id: 2}); - - assert.soon(() => changeStream.hasNext()); - change = changeStream.next(); - assert.eq(change.operationType, "invalidate"); - assert(changeStream.isExhausted()); - - // Test that it is possible to open a new change stream cursor on a collection that does not - // exist. - jsTestLog("Testing aggregate command on nonexistent collection"); - const collDoesNotExistName = "change_stream_agg_invalidations_does_not_exist"; - assertDropCollection(db, collDoesNotExistName); - - // Cursor creation succeeds, but there are no results. - const cursorObj = assert - .commandWorked(db.runCommand({ - aggregate: collDoesNotExistName, - pipeline: [{$changeStream: {}}], - cursor: {batchSize: 1}, - })) - .cursor; - - // We explicitly test getMore, to ensure that the getMore command for a non-existent collection - // does not return an error. - let getMoreResult = - assert - .commandWorked(db.runCommand( - {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1})) - .cursor; - assert.neq(getMoreResult.id, 0); - assert.eq(getMoreResult.nextBatch.length, 0, tojson(getMoreResult.nextBatch)); - - // After collection creation, we see oplog entries for the collection. - const collNowExists = assertCreateCollection(db, collDoesNotExistName); - assert.writeOK(collNowExists.insert({_id: 0})); - - assert.soon(function() { - getMoreResult = - assert - .commandWorked(db.runCommand( - {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1})) - .cursor; - assert.neq(getMoreResult.id, 0); - return getMoreResult.nextBatch.length > 0; - }, "Timed out waiting for another result from getMore on non-existent collection."); - assert.eq(getMoreResult.nextBatch.length, 1); - assert.eq(getMoreResult.nextBatch[0].operationType, "insert"); - assert.eq(getMoreResult.nextBatch[0].documentKey, {_id: 0}); - - assert.commandWorked( - db.runCommand({killCursors: collDoesNotExistName, cursors: [getMoreResult.id]})); -}()); diff --git a/jstests/change_streams/change_stream_rename_resumability.js b/jstests/change_streams/change_stream_rename_resumability.js deleted file mode 100644 index 46a3b3575b0..00000000000 --- a/jstests/change_streams/change_stream_rename_resumability.js +++ /dev/null @@ -1,51 +0,0 @@ -// Tests resuming on a change stream that was invalidated due to rename. - -(function() { - "use strict"; - - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - - let coll = assertDropAndRecreateCollection(db, "change_stream_invalidate_resumability"); - - // Drop the collection we'll rename to _before_ starting the changeStream, so that we don't - // get accidentally an invalidate when running on the whole DB or cluster. - assertDropCollection(db, coll.getName() + "_renamed"); - - const cursor = coll.watch(); - assert(!cursor.hasNext()); - - // Create an 'insert' oplog entry. - assert.writeOK(coll.insert({_id: 1})); - - assert.commandWorked(coll.renameCollection(coll.getName() + "_renamed")); - - // Update 'coll' to point to the renamed collection. - coll = db[coll.getName() + "_renamed"]; - - // Insert another document after the rename. - assert.writeOK(coll.insert({_id: 2})); - - // We should get 2 oplog entries of type insert and invalidate. - assert.soon(() => cursor.hasNext()); - let change = cursor.next(); - assert.eq(change.operationType, "insert", tojson(change)); - assert.docEq(change.fullDocument, {_id: 1}); - - assert.soon(() => cursor.hasNext()); - change = cursor.next(); - assert.eq(change.operationType, "invalidate", tojson(change)); - assert(cursor.isExhausted()); - - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to - // be sure that it doesn't crash the server, but the ability to resume a change stream after an - // invalidate is a bug, not a feature. - - // Try resuming from the invalidate. - assert.doesNotThrow(function() { - const resumeCursor = coll.watch([], {resumeAfter: change._id}); - assert.soon(() => resumeCursor.hasNext()); - // Not checking the contents of the document returned, because we do not technically - // support this behavior. - resumeCursor.next(); - }); -}()); diff --git a/jstests/change_streams/change_stream_rename_target.js b/jstests/change_streams/change_stream_rename_target.js deleted file mode 100644 index 92a1d0c7c64..00000000000 --- a/jstests/change_streams/change_stream_rename_target.js +++ /dev/null @@ -1,48 +0,0 @@ -// Tests that watching a collection which another collection is renamed _to_ causes an invalidate. -(function() { - "use strict"; - - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - - const testDB = db.getSiblingDB(jsTestName()); - - // Write a document to the collection and test that the change stream returns it - // and getMore command closes the cursor afterwards. - const collName1 = "change_stream_rename_1"; - const collName2 = "change_stream_rename_2"; - let coll = assertDropAndRecreateCollection(testDB, collName1); - assertDropCollection(testDB, collName2); - assertDropCollection(testDB, collName2); - - // Watch the collection which doesn't exist yet. - let aggCursor = testDB[collName2].watch(); - - // Insert something to the collection we're _not_ watching. - assert.writeOK(coll.insert({_id: 1})); - - assert.eq(aggCursor.hasNext(), false); - - // Now rename the collection TO the collection that's being watched. This should invalidate the - // change stream. - assert.commandWorked(coll.renameCollection(collName2)); - assert.soon(() => aggCursor.hasNext()); - let invalidate = aggCursor.next(); - assert.eq(invalidate.operationType, "invalidate"); - assert(aggCursor.isExhausted()); - - // Do another insert. - assert.writeOK(testDB[collName2].insert({_id: 2})); - - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to - // be sure that it doesn't crash the server, but the ability to resume a change stream after an - // invalidate is a bug, not a feature. - - // Try resuming from the invalidate. - assert.doesNotThrow(function() { - let cursor = testDB[collName2].watch([], {resumeAfter: invalidate._id}); - assert.soon(() => cursor.hasNext()); - // Not checking the contents of the document returned, because we do not technically - // support this behavior. - cursor.next(); - }); -}()); diff --git a/jstests/change_streams/change_stream_whole_cluster_invalidations.js b/jstests/change_streams/change_stream_whole_cluster_invalidations.js deleted file mode 100644 index d2c07cdbc81..00000000000 --- a/jstests/change_streams/change_stream_whole_cluster_invalidations.js +++ /dev/null @@ -1,134 +0,0 @@ -// Tests of invalidate entries for a $changeStream on a whole cluster. -(function() { - "use strict"; - - load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. - load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - - // Define two databases. We will conduct our tests by creating one collection in each. - const testDB1 = db, testDB2 = db.getSiblingDB(`${db.getName()}_other`); - const adminDB = db.getSiblingDB("admin"); - - // Create one collection on each database. - let [db1Coll, db2Coll] = - [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName())); - - // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened - // on admin. - let cst = new ChangeStreamTest(adminDB); - let aggCursor = cst.startWatchingAllChangesForCluster(); - - // Generate oplog entries of type insert, update, and delete across both databases. - for (let coll of[db1Coll, db2Coll]) { - assert.writeOK(coll.insert({_id: 1})); - assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); - assert.writeOK(coll.remove({_id: 1})); - } - - // Drop the second database, which should invalidate the stream. - assert.commandWorked(testDB2.dropDatabase()); - - // We should get 7 oplog entries; three ops of type insert, update, delete from each database, - // and then an invalidate. The cursor should be closed. - for (let expectedDB of[testDB1, testDB2]) { - let change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "update", tojson(change)); - assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "delete", tojson(change)); - assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); - } - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - // Test that a cluster-wide change stream can be resumed using a token from a collection which - // has been dropped. - db1Coll = assertDropAndRecreateCollection(testDB1, jsTestName()); - - // Get a valid resume token that the next change stream can use. - aggCursor = cst.startWatchingAllChangesForCluster(); - - assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); - - let change = cst.getOneChange(aggCursor, false); - const resumeToken = change._id; - - // For cluster-wide streams, it is possible to resume at a point before a collection is dropped, - // even if the invalidation has not been received on the original stream yet. - assertDropCollection(db1Coll, db1Coll.getName()); - // Wait for two-phase drop to complete, so that the UUID no longer exists. - assert.soon(function() { - return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1, - db1Coll.getName()); - }); - assert.commandWorked(adminDB.runCommand({ - aggregate: 1, - pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}], - cursor: {} - })); - - // Test that invalidation entries from any database invalidate the stream. - [db1Coll, db2Coll] = - [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName())); - let _idForTest = 0; - for (let collToInvalidate of[db1Coll, db2Coll]) { - // Start watching all changes in the cluster. - aggCursor = cst.startWatchingAllChangesForCluster(); - - let testDB = collToInvalidate.getDB(); - - // Insert into the collections on both databases, and verify the change stream is able to - // pick them up. - for (let collToWrite of[db1Coll, db2Coll]) { - assert.writeOK(collToWrite.insert({_id: _idForTest})); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, _idForTest); - assert.eq(change.ns.db, collToWrite.getDB().getName()); - _idForTest++; - } - - // Renaming the collection should invalidate the change stream. Skip this test when running - // on a sharded collection, since these cannot be renamed. - if (!FixtureHelpers.isSharded(collToInvalidate)) { - assert.writeOK(collToInvalidate.renameCollection("renamed_coll")); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - collToInvalidate = testDB.getCollection("renamed_coll"); - } - - // Dropping a collection should invalidate the change stream. - aggCursor = cst.startWatchingAllChangesForCluster(); - assertDropCollection(testDB, collToInvalidate.getName()); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - // Dropping a 'system' collection should invalidate the change stream. - // Create a view to ensure that the 'system.views' collection exists. - assert.commandWorked( - testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); - aggCursor = cst.startWatchingAllChangesForCluster(); - assertDropCollection(testDB, "system.views"); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - } - - cst.cleanUp(); -}()); diff --git a/jstests/change_streams/change_stream_whole_db_invalidations.js b/jstests/change_streams/change_stream_whole_db_invalidations.js deleted file mode 100644 index 463359ec839..00000000000 --- a/jstests/change_streams/change_stream_whole_db_invalidations.js +++ /dev/null @@ -1,125 +0,0 @@ -// Tests of invalidate entries for a $changeStream on a whole database. -(function() { - "use strict"; - - load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. - load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - - const testDB = db.getSiblingDB(jsTestName()); - let cst = new ChangeStreamTest(testDB); - - // Write a document to the collection and test that the change stream returns it - // and getMore command closes the cursor afterwards. - let coll = assertDropAndRecreateCollection(testDB, "change_stream_whole_db_invalidations"); - - let aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - - // Create oplog entries of type insert, update, and delete. - assert.writeOK(coll.insert({_id: 1})); - assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); - assert.writeOK(coll.remove({_id: 1})); - // Drop the collection. - assert.commandWorked(testDB.runCommand({drop: coll.getName()})); - // We should get 4 oplog entries of type insert, update, delete, and invalidate. The cursor - // should be closed. - let change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "insert", tojson(change)); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "update", tojson(change)); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "delete", tojson(change)); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - const collAgg = - assertDropAndRecreateCollection(testDB, "change_stream_whole_db_agg_invalidations"); - - // Get a valid resume token that the next change stream can use. - aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - - assert.writeOK(collAgg.insert({_id: 1}, {writeConcern: {w: "majority"}})); - - change = cst.getOneChange(aggCursor, false); - const resumeToken = change._id; - - // For whole-db streams, it is possible to resume at a point before a collection is dropped, - // even if the invalidation has not been received on the original stream yet. - assertDropCollection(testDB, collAgg.getName()); - // Wait for two-phase drop to complete, so that the UUID no longer exists. - assert.soon(function() { - return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB, - collAgg.getName()); - }); - assert.commandWorked(testDB.runCommand( - {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}})); - - // Test that invalidation entries for other databases are filtered out. - const otherDB = testDB.getSiblingDB("change_stream_whole_db_invalidations_other"); - const otherDBColl = otherDB["change_stream_whole_db_invalidations_other"]; - assert.writeOK(otherDBColl.insert({_id: 0})); - - // Create collection on the database being watched. - coll = assertDropAndRecreateCollection(testDB, "change_stream_whole_db_invalidations"); - - // Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be - // upconverted to a cluster-wide stream, which *would* be invalidated by dropping the collection - // in the other database. - aggCursor = cst.startWatchingChanges( - {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true}); - - // Drop the collection on the other database, this should *not* invalidate the change stream. - assertDropCollection(otherDB, otherDBColl.getName()); - - // Insert into the collection in the watched database, and verify the change stream is able to - // pick it up. - assert.writeOK(coll.insert({_id: 1})); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 1); - - // Test that renaming a collection will invalidate the change stream. MongoDB does not allow - // renaming of sharded collections, so only perform this test if the collection is not sharded. - if (!FixtureHelpers.isSharded(coll)) { - assertDropCollection(testDB, coll.getName()); - - assertCreateCollection(testDB, coll.getName()); - assertDropCollection(testDB, "renamed_coll"); - aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - assert.writeOK(coll.renameCollection("renamed_coll")); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - } - - // Dropping a collection should invalidate the change stream. - assertDropCollection(testDB, coll.getName()); - aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - assertCreateCollection(testDB, coll.getName()); - assertDropCollection(testDB, coll.getName()); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - // Dropping a 'system' collection should invalidate the change stream. - // Create a view to ensure that the 'system.views' collection exists. - assert.commandWorked( - testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []})); - aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - assertDropCollection(testDB, "system.views"); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - cst.cleanUp(); -}()); diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/collation.js index 4375da16843..00432b91057 100644 --- a/jstests/change_streams/change_stream_collation.js +++ b/jstests/change_streams/collation.js @@ -312,7 +312,8 @@ // $changeStream. assert.soon(() => changeStream.hasNext()); assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"}); - assert(changeStream.isExhausted()); + // Only single-collection streams will be exhausted from the drop. + assert(changeStream.isExhausted() || isChangeStreamPassthrough()); // Test that a pipeline with an explicit collation is allowed to resume from before the // collection is dropped and recreated. @@ -322,7 +323,8 @@ assert.soon(() => changeStream.hasNext()); assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); - assert(changeStream.isExhausted()); + // Only single-collection streams will be exhausted from the drop. + assert(changeStream.isExhausted() || isChangeStreamPassthrough()); // Test that a pipeline without an explicit collation is not allowed to resume, // even though the collection has been recreated with the same default collation as it diff --git a/jstests/change_streams/change_stream_does_not_implicitly_create_database.js b/jstests/change_streams/does_not_implicitly_create_database.js index 052a53585bd..052a53585bd 100644 --- a/jstests/change_streams/change_stream_does_not_implicitly_create_database.js +++ b/jstests/change_streams/does_not_implicitly_create_database.js diff --git a/jstests/change_streams/include_cluster_time.js b/jstests/change_streams/include_cluster_time.js index 4be37f1f338..405bf2d5b16 100644 --- a/jstests/change_streams/include_cluster_time.js +++ b/jstests/change_streams/include_cluster_time.js @@ -2,6 +2,7 @@ (function() { "use strict"; + load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp. load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. // Drop and recreate the collections to be used in this set of tests. @@ -47,8 +48,10 @@ assert.soon(() => changeStream.hasNext()); next = changeStream.next(); - assert.eq(next.operationType, "invalidate"); + assert.eq(next.operationType, "drop"); assert.lte(next.clusterTime, dropClusterTime); + assertInvalidateOp({cursor: changeStream, opType: "drop"}); + changeStream.close(); }()); diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index ec7f47d57f6..7d1a0a1a457 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -236,7 +236,11 @@ }); latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); - latestChange = cst.getOneChange(cursor, true); - assert.eq(latestChange.operationType, "invalidate"); - assert(!latestChange.hasOwnProperty("fullDocument")); + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "drop"); + // Only single-collection change streams will be invalidated by the drop. + if (!isChangeStreamPassthrough()) { + latestChange = cst.getOneChange(cursor, true); + assert.eq(latestChange.operationType, "invalidate"); + } }()); diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js new file mode 100644 index 00000000000..ea7559de77b --- /dev/null +++ b/jstests/change_streams/metadata_notifications.js @@ -0,0 +1,201 @@ +// Tests of $changeStream notifications for metadata operations. +(function() { + "use strict"; + + load("jstests/libs/change_stream_util.js"); + load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + db = db.getSiblingDB(jsTestName()); + let cst = new ChangeStreamTest(db); + + db.getMongo().forceReadMode('commands'); + + // Test that it is possible to open a new change stream cursor on a collection that does not + // exist. + const collName = "test"; + assertDropCollection(db, collName); + + // Cursor creation succeeds, but there are no results. We do not expect to see a notification + // for collection creation. + let cursor = cst.startWatchingChanges( + {collection: collName, pipeline: [{$changeStream: {}}, {$project: {operationType: 1}}]}); + + // We explicitly test getMore, to ensure that the getMore command for a non-existent collection + // does not return an error. + let change = cst.getNextBatch(cursor); + assert.neq(change.id, 0); + assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch)); + + // After collection creation, we expect to see oplog entries for each subsequent operation. + let coll = assertCreateCollection(db, collName); + assert.writeOK(coll.insert({_id: 0})); + + change = cst.getOneChange(cursor); + assert.eq(change.operationType, "insert", tojson(change)); + + // Create oplog entries of type insert, update, delete, and drop. + assert.writeOK(coll.insert({_id: 1})); + assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); + assert.writeOK(coll.remove({_id: 1})); + assertDropCollection(db, coll.getName()); + + // Get a valid resume token that the next aggregate command can use. + change = cst.getOneChange(cursor); + assert.eq(change.operationType, "insert"); + const resumeToken = change._id; + + // We should get 4 oplog entries of type update, delete, drop, and invalidate. The cursor should + // be closed. + let expectedChanges = [ + {operationType: "update"}, + {operationType: "delete"}, + {operationType: "drop"}, + {operationType: "invalidate"}, + ]; + const changes = cst.assertNextChangesEqual( + {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true}); + + // It should not be possible to resume a change stream after a collection drop without an + // explicit collation, even if the invalidate has not been received. + assert.commandFailedWithCode(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {} + }), + ErrorCodes.InvalidResumeToken); + + // Recreate the collection. + coll = assertCreateCollection(db, collName); + assert.writeOK(coll.insert({_id: 0})); + + // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to + // be sure that it doesn't crash the server, but the ability to resume a change stream after an + // invalidate is a bug, not a feature. + + // Test resuming the change stream from the collection drop. + assert.doesNotThrow(function() { + const resumeTokenDrop = changes[2]._id; + const resumeCursor = + coll.watch([], {resumeAfter: resumeTokenDrop, collation: {locale: "simple"}}); + assert.soon(() => resumeCursor.hasNext()); + // Not checking the contents of the document returned, because we do not technically + // support this behavior. + resumeCursor.next(); + }); + // Test resuming the change stream from the invalidate after the drop. + assert.doesNotThrow(function() { + const resumeTokenInvalidate = changes[3]._id; + const resumeCursor = + coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}}); + assert.soon(() => resumeCursor.hasNext()); + // Not checking the contents of the document returned, because we do not technically + // support this behavior. + resumeCursor.next(); + }); + + // Test that renaming a collection being watched generates a "rename" entry followed by an + // "invalidate". This is true if the change stream is on the source or target collection of the + // rename. Sharded collections cannot be renamed. + if (!FixtureHelpers.isSharded(coll)) { + cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]}); + assertDropCollection(db, "renamed_coll"); + assert.writeOK(coll.renameCollection("renamed_coll")); + let expected = [ + { + operationType: "rename", + ns: {db: db.getName(), coll: collName}, + to: {db: db.getName(), coll: "renamed_coll"}, + }, + {operationType: "invalidate"} + ]; + cst.assertNextChangesEqual( + {cursor: cursor, expectedChanges: expected, expectInvalidate: true}); + + coll = db["renamed_coll"]; + + // Repeat the test, this time with a change stream open on the target. + cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]}); + assert.writeOK(coll.renameCollection(collName)); + expected = [ + { + operationType: "rename", + ns: {db: db.getName(), coll: "renamed_coll"}, + to: {db: db.getName(), coll: collName}, + }, + {operationType: "invalidate"} + ]; + const changes = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + + coll = db[collName]; + + // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here + // to be sure that it doesn't crash the server, but the ability to resume a change stream + // after an invalidate is a bug, not a feature. + + // Test resuming the change stream from the collection rename. + assert.doesNotThrow(function() { + const resumeTokenRename = changes[0]._id; + const resumeCursor = + coll.watch([], {resumeAfter: resumeTokenRename, collation: {locale: "simple"}}); + }); + // Test resuming the change stream from the invalidate after the drop. + assert.doesNotThrow(function() { + const resumeTokenInvalidate = changes[1]._id; + const resumeCursor = + coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}}); + }); + + assertDropAndRecreateCollection(db, "renamed_coll"); + assert.writeOK(db.renamed_coll.insert({_id: 0})); + + // Repeat the test again, this time using the 'dropTarget' option with an existing target + // collection. + cursor = + cst.startWatchingChanges({collection: "renamed_coll", pipeline: [{$changeStream: {}}]}); + assert.writeOK(coll.renameCollection("renamed_coll", true /* dropTarget */)); + expected = [ + { + operationType: "rename", + ns: {db: db.getName(), coll: collName}, + to: {db: db.getName(), coll: "renamed_coll"}, + }, + {operationType: "invalidate"} + ]; + cst.assertNextChangesEqual( + {cursor: cursor, expectedChanges: expected, expectInvalidate: true}); + + coll = db["renamed_coll"]; + + // Test the behavior of a change stream watching the target collection of a $out aggregation + // stage. + cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]}); + coll.aggregate([{$out: collName}]); + // Note that $out will first create a temp collection, and then rename the temp collection + // to the target. Do not explicitly check the 'ns' field. + const rename = cst.getOneChange(cursor); + assert.eq(rename.operationType, "rename", tojson(rename)); + assert.eq(rename.to, {db: db.getName(), coll: collName}, tojson(rename)); + assert.eq(cst.getOneChange(cursor, true).operationType, "invalidate"); + } + + // Test that dropping a database will first drop all of it's collections, invalidating any + // change streams on those collections. + cursor = cst.startWatchingChanges({ + collection: coll.getName(), + pipeline: [{$changeStream: {}}], + }); + assert.commandWorked(db.dropDatabase()); + + expectedChanges = [ + { + operationType: "drop", + ns: {db: db.getName(), coll: coll.getName()}, + }, + {operationType: "invalidate"} + ]; + cst.assertNextChangesEqual( + {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true}); + + cst.cleanUp(); +}()); diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/shell_helper.js index 6ecb95221eb..e833c053013 100644 --- a/jstests/change_streams/change_stream_shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -6,6 +6,7 @@ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. + load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp. const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper"); @@ -162,14 +163,12 @@ assert(!changeStreamCursor.isClosed()); assert(!changeStreamCursor.isExhausted()); - // Dropping the collection should invalidate any open change streams. + // Dropping the collection should trigger a drop notification. assertDropCollection(db, coll.getName()); assert.soon(() => changeStreamCursor.hasNext()); - assert(changeStreamCursor.isClosed()); assert(!changeStreamCursor.isExhausted()); - expected = {operationType: "invalidate"}; + expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}; checkNextChange(changeStreamCursor, expected); - assert(!changeStreamCursor.hasNext()); - assert(changeStreamCursor.isClosed()); - assert(changeStreamCursor.isExhausted()); + // For single collection change streams, the drop should invalidate the stream. + assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); }()); diff --git a/jstests/change_streams/change_stream_start_at_cluster_time.js b/jstests/change_streams/start_at_cluster_time.js index 3fb6786437a..3fb6786437a 100644 --- a/jstests/change_streams/change_stream_start_at_cluster_time.js +++ b/jstests/change_streams/start_at_cluster_time.js diff --git a/jstests/change_streams/change_stream_whitelist.js b/jstests/change_streams/whitelist.js index 12848e1e9e7..12848e1e9e7 100644 --- a/jstests/change_streams/change_stream_whitelist.js +++ b/jstests/change_streams/whitelist.js diff --git a/jstests/change_streams/change_stream_whole_cluster.js b/jstests/change_streams/whole_cluster.js index 0f3c9d6ec92..485721dce44 100644 --- a/jstests/change_streams/change_stream_whole_cluster.js +++ b/jstests/change_streams/whole_cluster.js @@ -6,8 +6,9 @@ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and // assert[Valid|Invalid]ChangeStreamNss. + db = db.getSiblingDB(jsTestName()); const adminDB = db.getSiblingDB("admin"); - const otherDB = db.getSiblingDB(`${db.getName()}_other`); + const otherDB = db.getSiblingDB(jsTestName() + "_other"); // Drop and recreate the collections to be used in this set of tests. assertDropAndRecreateCollection(db, "t1"); @@ -49,10 +50,17 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); - // Dropping either database should invalidate the change stream. + // Dropping a database should generate drop entries for each collection followed by an + // invalidate. + // TODO SERVER-35029: This test should not invalidate the stream once there's support for + // returning a notification for the dropDatabase command. assert.commandWorked(otherDB.dropDatabase()); - expected = {operationType: "invalidate"}; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + expected = [ + {operationType: "drop", ns: {db: otherDB.getName(), coll: "t2"}}, + {operationType: "invalidate"} + ]; + + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); // Drop the remaining database and clean up the test. assert.commandWorked(db.dropDatabase()); diff --git a/jstests/change_streams/whole_cluster_metadata_notifications.js b/jstests/change_streams/whole_cluster_metadata_notifications.js new file mode 100644 index 00000000000..9004e36135b --- /dev/null +++ b/jstests/change_streams/whole_cluster_metadata_notifications.js @@ -0,0 +1,225 @@ +// Tests of metadata notifications for a $changeStream on a whole cluster. +(function() { + "use strict"; + + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. + + // Define two databases. We will conduct our tests by creating one collection in each. + const testDB1 = db.getSiblingDB(jsTestName()), + testDB2 = db.getSiblingDB(jsTestName() + "_other"); + const adminDB = db.getSiblingDB("admin"); + + assert.commandWorked(testDB1.dropDatabase()); + assert.commandWorked(testDB2.dropDatabase()); + + // Create one collection on each database. + let [db1Coll, db2Coll] = + [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test")); + + // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened + // on admin. + let cst = new ChangeStreamTest(adminDB); + let aggCursor = cst.startWatchingAllChangesForCluster(); + + // Generate oplog entries of type insert, update, and delete across both databases. + for (let coll of[db1Coll, db2Coll]) { + assert.writeOK(coll.insert({_id: 1})); + assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); + assert.writeOK(coll.remove({_id: 1})); + } + + // Drop the second database, which should invalidate the stream. + assert.commandWorked(testDB2.dropDatabase()); + + // We should get 6 oplog entries; three ops of type insert, update, delete from each database. + for (let expectedDB of[testDB1, testDB2]) { + let change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "update", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "delete", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + } + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [ + {operationType: "drop", ns: {db: testDB2.getName(), coll: db2Coll.getName()}}, + // TODO SERVER-35029: Return an entry for a database drop, instead of "invalidate". + {operationType: "invalidate"} + ], + expectInvalidate: true + }); + + // Test that a cluster-wide change stream can be resumed using a token from a collection which + // has been dropped. + db1Coll = assertDropAndRecreateCollection(testDB1, db1Coll.getName()); + + // Get a valid resume token that the next change stream can use. + aggCursor = cst.startWatchingAllChangesForCluster(); + + assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + let change = cst.getOneChange(aggCursor, false); + const resumeToken = change._id; + + // For cluster-wide streams, it is possible to resume at a point before a collection is dropped, + // even if the "drop" notification has not been received on the original stream yet. + assertDropCollection(db1Coll, db1Coll.getName()); + // Wait for two-phase drop to complete, so that the UUID no longer exists. + assert.soon(function() { + return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1, + db1Coll.getName()); + }); + assert.commandWorked(adminDB.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}], + cursor: {} + })); + + // Test that collection drops from any database result in "drop" notifications for the stream. + [db1Coll, db2Coll] = + [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test")); + let _idForTest = 0; + for (let collToInvalidate of[db1Coll, db2Coll]) { + // Start watching all changes in the cluster. + aggCursor = cst.startWatchingAllChangesForCluster(); + + let testDB = collToInvalidate.getDB(); + + // Insert into the collections on both databases, and verify the change stream is able to + // pick them up. + for (let collToWrite of[db1Coll, db2Coll]) { + assert.writeOK(collToWrite.insert({_id: _idForTest})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, _idForTest); + assert.eq(change.ns.db, collToWrite.getDB().getName()); + _idForTest++; + } + + // Renaming the collection should generate a 'rename' notification. Skip this test when + // running on a sharded collection, since these cannot be renamed. + if (!FixtureHelpers.isSharded(collToInvalidate)) { + assertDropAndRecreateCollection(testDB, collToInvalidate.getName()); + const collName = collToInvalidate.getName(); + + // Start watching all changes in the cluster. + aggCursor = cst.startWatchingAllChangesForCluster(); + assert.writeOK(collToInvalidate.renameCollection("renamed_coll")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [ + { + operationType: "rename", + ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, + to: {db: testDB.getName(), coll: "renamed_coll"} + }, + ] + }); + + // Repeat the test, this time using the 'dropTarget' option with an existing target + // collection. + collToInvalidate = testDB.getCollection("renamed_coll"); + assertDropAndRecreateCollection(testDB, collName); + assert.writeOK(testDB[collName].insert({_id: 0})); + assert.writeOK(collToInvalidate.renameCollection(collName, true /* dropTarget */)); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [ + { + operationType: "insert", + ns: {db: testDB.getName(), coll: collName}, + documentKey: {_id: 0}, + fullDocument: {_id: 0} + }, + { + operationType: "rename", + ns: {db: testDB.getName(), coll: "renamed_coll"}, + to: {db: testDB.getName(), coll: collName} + } + ] + }); + + collToInvalidate = testDB[collName]; + + // Test renaming a collection to a different database. Do not run this in the mongos + // passthrough suites since we cannot guarantee the primary shard of the target database + // and renameCollection requires the source and destination to be on the same shard. + if (!FixtureHelpers.isMongos(testDB)) { + const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target"); + // Ensure the target database exists. + const collOtherDB = assertDropAndRecreateCollection(otherDB, "test"); + assertDropCollection(otherDB, collOtherDB.getName()); + aggCursor = cst.startWatchingAllChangesForCluster(); + assert.commandWorked(testDB.adminCommand({ + renameCollection: collToInvalidate.getFullName(), + to: collOtherDB.getFullName() + })); + // Do not check the 'ns' field since it will contain the namespace of the temp + // collection created when renaming a collection across databases. + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "rename", tojson(change)); + assert.eq(change.to, + {db: otherDB.getName(), coll: collOtherDB.getName()}, + tojson(change)); + // Rename across databases also drops the source collection after the collection is + // copied over. + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{ + operationType: "drop", + ns: {db: testDB.getName(), coll: collToInvalidate.getName()} + }] + }); + } + + // Test the behavior of a change stream watching the target collection of a $out + // aggregation stage. + collToInvalidate.aggregate([{$out: "renamed_coll"}]); + // Do not check the 'ns' field since it will contain the namespace of the temp + // collection created by the $out stage, before renaming to 'renamed_coll'. + const rename = cst.getOneChange(aggCursor); + assert.eq(rename.operationType, "rename", tojson(rename)); + assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename)); + + // The change stream should not be invalidated by the rename(s). + assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length); + assert.writeOK(collToInvalidate.insert({_id: 2})); + assert.eq(cst.getOneChange(aggCursor).operationType, "insert"); + } + + // Dropping a collection should generate a 'drop' entry. + assertDropCollection(testDB, collToInvalidate.getName()); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{ + operationType: "drop", + ns: {db: testDB.getName(), coll: collToInvalidate.getName()} + }] + }); + + // Dropping a 'system' collection should also generate a 'drop' notification. + // Create a view to ensure that the 'system.views' collection exists. + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); + // TODO SERVER-35401: This insert is inconsistent with the behavior for whole-db change + // streams. + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns, {db: testDB.getName(), coll: "system.views"}); + assertDropCollection(testDB, "system.views"); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: + [{operationType: "drop", ns: {db: testDB.getName(), coll: "system.views"}}] + }); + } + + cst.cleanUp(); +}()); diff --git a/jstests/change_streams/change_stream_whole_cluster_resumability.js b/jstests/change_streams/whole_cluster_resumability.js index df1041d0628..9058c58010a 100644 --- a/jstests/change_streams/change_stream_whole_cluster_resumability.js +++ b/jstests/change_streams/whole_cluster_resumability.js @@ -6,9 +6,8 @@ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. // Create two databases, with one collection in each. - const testDBs = [db, db.getSiblingDB(`${db.getName()}_other`)]; - const[db1Coll, db2Coll] = - testDBs.map((db) => assertDropAndRecreateCollection(db, jsTestName())); + const testDBs = [db, db.getSiblingDB(jsTestName() + "_other")]; + const[db1Coll, db2Coll] = testDBs.map((db) => assertDropAndRecreateCollection(db, "test")); const adminDB = db.getSiblingDB("admin"); let cst = new ChangeStreamTest(adminDB); diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/whole_db.js index c904c4a750e..1f57797b784 100644 --- a/jstests/change_streams/change_stream_whole_db.js +++ b/jstests/change_streams/whole_db.js @@ -7,9 +7,7 @@ // assert[Valid|Invalid]ChangeStreamNss. load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - // Drop and recreate the collections to be used in this set of tests. - assertDropAndRecreateCollection(db, "t1"); - assertDropAndRecreateCollection(db, "t2"); + db = db.getSiblingDB(jsTestName()); // Test that a single-database change stream cannot be opened on "admin", "config", or "local". assertInvalidChangeStreamNss("admin", 1); @@ -29,7 +27,7 @@ let expected = { documentKey: {_id: 0}, fullDocument: {_id: 0, a: 1}, - ns: {db: "test", coll: "t1"}, + ns: {db: db.getName(), coll: "t1"}, operationType: "insert", }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); @@ -40,14 +38,24 @@ expected = { documentKey: {_id: 0}, fullDocument: {_id: 0, a: 2}, - ns: {db: "test", coll: "t2"}, + ns: {db: db.getName(), coll: "t2"}, operationType: "insert", }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); - // Dropping the database should invalidate the change stream. + // Dropping the database should generate collection drop entries followed by an invalidate. Note + // that the order of collection drops is not guaranteed so only check the database name. assert.commandWorked(db.dropDatabase()); - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]}); + let change = cst.getOneChange(cursor); + assert.eq(change.operationType, "drop", tojson(change)); + assert.eq(change.ns.db, db.getName(), tojson(change)); + change = cst.getOneChange(cursor); + assert.eq(change.operationType, "drop", tojson(change)); + assert.eq(change.ns.db, db.getName(), tojson(change)); + + // TODO SERVER-35029: Expect to see a 'dropDatabase' entry before the invalidate. + change = cst.getOneChange(cursor, true); + assert.eq(change.operationType, "invalidate", tojson(change)); cst.cleanUp(); }()); diff --git a/jstests/change_streams/whole_db_metadata_notifications.js b/jstests/change_streams/whole_db_metadata_notifications.js new file mode 100644 index 00000000000..ade3bbe6f34 --- /dev/null +++ b/jstests/change_streams/whole_db_metadata_notifications.js @@ -0,0 +1,183 @@ +// Tests of metadata notifications for a $changeStream on a whole database. +(function() { + "use strict"; + + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. + + const testDB = db.getSiblingDB(jsTestName()); + let cst = new ChangeStreamTest(testDB); + + // Write a document to the collection and test that the change stream returns it + // and getMore command closes the cursor afterwards. + const collName = "test"; + let coll = assertDropAndRecreateCollection(testDB, collName); + + let aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + + // Create oplog entries of type insert, update, and delete. + assert.writeOK(coll.insert({_id: 1})); + assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); + assert.writeOK(coll.remove({_id: 1})); + + // Drop and recreate the collection. + const collAgg = assertDropAndRecreateCollection(testDB, collName); + + // We should get 4 oplog entries of type insert, update, delete, and drop. + let change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "update", tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "delete", tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "drop", tojson(change)); + + // Get a valid resume token that the next change stream can use. + assert.writeOK(collAgg.insert({_id: 1})); + + change = cst.getOneChange(aggCursor, false); + const resumeToken = change._id; + + // For whole-db streams, it is possible to resume at a point before a collection is dropped. + assertDropCollection(testDB, collAgg.getName()); + // Wait for two-phase drop to complete, so that the UUID no longer exists. + assert.soon(function() { + return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB, + collAgg.getName()); + }); + assert.commandWorked(testDB.runCommand( + {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}})); + + // Test that invalidation entries for other databases are filtered out. + const otherDB = testDB.getSiblingDB(jsTestName() + "other"); + const otherDBColl = otherDB[collName + "_other"]; + assert.writeOK(otherDBColl.insert({_id: 0})); + + // Create collection on the database being watched. + coll = assertDropAndRecreateCollection(testDB, collName); + + // Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be + // upconverted to a cluster-wide stream, which would return an entry for the dropped collection + // in the other database. + aggCursor = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true}); + + // Drop the collection on the other database, this should *not* invalidate the change stream. + assertDropCollection(otherDB, otherDBColl.getName()); + + // Insert into the collection in the watched database, and verify the change stream is able to + // pick it up. + assert.writeOK(coll.insert({_id: 1})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 1); + + // Test that renaming a collection generates a 'rename' entry for the 'from' collection. MongoDB + // does not allow renaming of sharded collections, so only perform this test if the collection + // is not sharded. + if (!FixtureHelpers.isSharded(coll)) { + assertDropAndRecreateCollection(testDB, coll.getName()); + assertDropCollection(testDB, "renamed_coll"); + aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + assert.writeOK(coll.renameCollection("renamed_coll")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{ + operationType: "rename", + ns: {db: testDB.getName(), coll: coll.getName()}, + to: {db: testDB.getName(), coll: "renamed_coll"} + }] + }); + + // Repeat the test, this time using the 'dropTarget' option with an existing target + // collection. + coll = testDB["renamed_coll"]; + assertCreateCollection(testDB, collName); + assert.writeOK(testDB[collName].insert({_id: 0})); + assert.writeOK(coll.renameCollection(collName, true /* dropTarget */)); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [ + { + operationType: "insert", + ns: {db: testDB.getName(), coll: collName}, + documentKey: {_id: 0}, + fullDocument: {_id: 0} + }, + { + operationType: "rename", + ns: {db: testDB.getName(), coll: "renamed_coll"}, + to: {db: testDB.getName(), coll: collName} + } + ] + }); + + coll = testDB[collName]; + // Test renaming a collection from the database being watched to a different database. Do + // not run this in the mongos passthrough suites since we cannot guarantee the primary shard + // of the target database, and renameCollection requires the source and destination to be on + // the same shard. + if (!FixtureHelpers.isMongos(testDB)) { + const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target"); + // Create target collection to ensure the database exists. + const collOtherDB = assertCreateCollection(otherDB, "test"); + assertDropCollection(otherDB, "test"); + assert.commandWorked(testDB.adminCommand( + {renameCollection: coll.getFullName(), to: collOtherDB.getFullName()})); + // Rename across databases drops the source collection after the collection is copied + // over. + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: + [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}] + }); + + // Test renaming a collection from a different database to the database being watched. + assert.commandWorked(testDB.adminCommand( + {renameCollection: collOtherDB.getFullName(), to: coll.getFullName()})); + // Do not check the 'ns' field since it will contain the namespace of the temp + // collection created when renaming a collection across databases. + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "rename"); + assert.eq(change.to, {db: testDB.getName(), coll: coll.getName()}); + } + + // Test the behavior of a change stream watching the target collection of a $out aggregation + // stage. + coll.aggregate([{$out: "renamed_coll"}]); + // Note that $out will first create a temp collection, and then rename the temp collection + // to the target. Do not explicitly check the 'ns' field. + const rename = cst.getOneChange(aggCursor); + assert.eq(rename.operationType, "rename", tojson(rename)); + assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename)); + + // The change stream should not be invalidated by the rename(s). + assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length); + assert.writeOK(coll.insert({_id: 2})); + assert.eq(cst.getOneChange(aggCursor).operationType, "insert"); + } + + // Dropping a collection should return a 'drop' entry. + assertDropCollection(testDB, coll.getName()); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: + [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}], + }); + + // Dropping a 'system' collection should also generate a 'drop' notification. + // Create a view to ensure that the 'system.views' collection exists. + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []})); + assertDropCollection(testDB, "system.views"); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: + [{operationType: "drop", ns: {db: testDB.getName(), coll: "system.views"}}] + }); + + cst.cleanUp(); +}()); diff --git a/jstests/change_streams/change_stream_whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js index 682d0546cb7..682d0546cb7 100644 --- a/jstests/change_streams/change_stream_whole_db_resumability.js +++ b/jstests/change_streams/whole_db_resumability.js diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index 4221adbaa30..48f1e133b53 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -7,10 +7,36 @@ // the same as calling db.runCommand. If a passthrough is active and has defined a function // 'changeStreamPassthroughAwareRunCommand', then this method will be overridden to allow individual // streams to explicitly exempt themselves from being modified by the passthrough. +function isChangeStreamPassthrough() { + return typeof changeStreamPassthroughAwareRunCommand != 'undefined'; +} const runCommandChangeStreamPassthroughAware = - (typeof changeStreamPassthroughAwareRunCommand === 'undefined' - ? ((db, cmdObj) => db.runCommand(cmdObj)) - : changeStreamPassthroughAwareRunCommand); + (!isChangeStreamPassthrough() ? ((db, cmdObj) => db.runCommand(cmdObj)) + : changeStreamPassthroughAwareRunCommand); + +/** + * Asserts that the given opType triggers an invalidate entry depending on the type of change + * stream. + * - single collection streams: drop, rename, and dropDatabase. + * - whole DB streams: dropDatabase. + * - whole cluster streams: none. + */ +function assertInvalidateOp({cursor, opType}) { + if (!isChangeStreamPassthrough()) { + // All metadata operations will invalidate a single-collection change stream. + assert.soon(() => cursor.hasNext()); + assert.eq(cursor.next().operationType, "invalidate"); + assert(cursor.isExhausted()); + assert(cursor.isClosed()); + } else { + // Collection drops do not validate whole-db/cluster change streams. + if (opType == "drop") { + return; + } + + // TODO SERVER-35029: Database drops should only invalidate whole-db change streams. + } +} function ChangeStreamTest(_db, name = "ChangeStreamTest") { load("jstests/libs/namespace_utils.js"); // For getCollectionNameFromFullNamespace. diff --git a/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js index 3ee93a6fb24..f6a1da78b4e 100644 --- a/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js +++ b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js @@ -27,6 +27,12 @@ ChangeStreamPassthroughHelpers.nsMatchFilter = function(db, collName) { "ns.db": db.getName(), "ns.coll": (isSingleCollectionStream ? collName : {$exists: true}) }, + // Add a clause to detect if the collection being watched is the target of a + // renameCollection command, since that is expected to return a "rename" entry. + { + "to.db": db.getName(), + "to.coll": (isSingleCollectionStream ? collName : {$exists: true}) + }, {operationType: "invalidate"} ] } diff --git a/jstests/libs/override_methods/implicit_whole_db_changestreams.js b/jstests/libs/override_methods/implicit_whole_db_changestreams.js index f88af373263..056a2659a0c 100644 --- a/jstests/libs/override_methods/implicit_whole_db_changestreams.js +++ b/jstests/libs/override_methods/implicit_whole_db_changestreams.js @@ -65,8 +65,11 @@ const ChangeStreamPassthroughHelpers = { nsMatchFilter: function(db, collName) { return { $match: { - $or: - [{"ns.db": db.getName(), "ns.coll": collName}, {operationType: "invalidate"}] + $or: [ + {"ns.db": db.getName(), "ns.coll": collName}, + {"to.db": db.getName(), "to.coll": collName}, + {operationType: "invalidate"} + ] } }; }, diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_metadata_notifications.js index a50d8eb88ea..ec8865698d5 100644 --- a/jstests/sharding/change_stream_invalidation.js +++ b/jstests/sharding/change_stream_metadata_notifications.js @@ -1,9 +1,11 @@ -// Tests invalidation of change streams on sharded collections. +// Tests metadata notifications of change streams on sharded collections. +// Legacy getMore fails after dropping the database that the original cursor is on. +// @tags: [requires_find_command] (function() { "use strict"; + load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. - load('jstests/libs/write_concern_util.js'); // For stopReplicationOnSecondaries. // For supportsMajorityReadConcern. load('jstests/multiVersion/libs/causal_consistency_helpers.js'); @@ -53,10 +55,11 @@ assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}})); assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2})); - // Drop the collection and test that we return "invalidate" entry and close the cursor. + // Drop the collection and test that we return a "drop" entry, followed by an "invalidate" + // entry. mongosColl.drop(); - // Test that we see the two writes that happened before the invalidation. + // Test that we see the two writes that happened before the collection drop. assert.soon(() => changeStream.hasNext()); let next = changeStream.next(); assert.eq(next.operationType, "update"); @@ -75,6 +78,11 @@ assert.soon(() => changeStream.hasNext()); next = changeStream.next(); + assert.eq(next.operationType, "drop"); + assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); assert.eq(next.operationType, "invalidate"); assert(changeStream.isExhausted()); @@ -95,6 +103,11 @@ assert.soon(() => changeStream.hasNext()); next = changeStream.next(); + assert.eq(next.operationType, "drop"); + assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); assert.eq(next.operationType, "invalidate"); assert(changeStream.isExhausted()); @@ -122,5 +135,24 @@ }), ErrorCodes.InvalidResumeToken); + // Recreate the collection as unsharded and open a change stream on it. + assertDropAndRecreateCollection(mongosDB, mongosColl.getName()); + + changeStream = mongosColl.watch(); + + // Drop the database and verify that the stream returns a collection drop followed by an + // invalidate. + assert.commandWorked(mongosDB.dropDatabase()); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "drop"); + assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "invalidate"); + assert(changeStream.isExhausted()); + st.stop(); })(); diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js index cdec0971204..1000d047cd0 100644 --- a/jstests/sharding/change_streams.js +++ b/jstests/sharding/change_streams.js @@ -120,13 +120,16 @@ changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}]); assert(!changeStream.hasNext()); - // Drop the collection and test that we return "invalidate" entry and close the cursor. + // Drop the collection and test that we return a "drop" followed by an "invalidate" entry and + // close the cursor. jsTestLog("Testing getMore command closes cursor for invalidate entries"); mongosColl.drop(); // Wait for the drop to actually happen. assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( mongosColl.getDB(), mongosColl.getName())); assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + assert.soon(() => changeStream.hasNext()); assert.eq(changeStream.next().operationType, "invalidate"); assert(changeStream.isExhausted()); @@ -164,8 +167,10 @@ assert.eq(next.documentKey._id, 2); assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + + assert.soon(() => changeStream.hasNext()); assert.eq(changeStream.next().operationType, "invalidate"); - assert(changeStream.isExhausted()); // With an explicit collation, test that we can resume from before the collection drop. changeStream = mongosColl.watch([{$project: {_id: 0}}], @@ -177,9 +182,10 @@ assert.eq(next.documentKey, {_id: 2}); assert.soon(() => changeStream.hasNext()); - next = changeStream.next(); - assert.eq(next.operationType, "invalidate"); - assert(changeStream.isExhausted()); + assert.eq(changeStream.next().operationType, "drop"); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); // Without an explicit collation, test that we *cannot* resume from before the collection drop. assert.commandFailedWithCode(mongosDB.runCommand({ diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 4ad710b50c4..2e3a152f872 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -78,10 +78,14 @@ constexpr StringData DocumentSourceChangeStream::kStageName; constexpr StringData DocumentSourceChangeStream::kClusterTimeField; constexpr StringData DocumentSourceChangeStream::kTxnNumberField; constexpr StringData DocumentSourceChangeStream::kLsidField; +constexpr StringData DocumentSourceChangeStream::kRenameTargetNssField; constexpr StringData DocumentSourceChangeStream::kUpdateOpType; constexpr StringData DocumentSourceChangeStream::kDeleteOpType; constexpr StringData DocumentSourceChangeStream::kReplaceOpType; constexpr StringData DocumentSourceChangeStream::kInsertOpType; +constexpr StringData DocumentSourceChangeStream::kDropCollectionOpType; +constexpr StringData DocumentSourceChangeStream::kRenameCollectionOpType; +constexpr StringData DocumentSourceChangeStream::kDropDatabaseOpType; constexpr StringData DocumentSourceChangeStream::kInvalidateOpType; constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType; @@ -202,6 +206,7 @@ private: : DocumentSource(expCtx) {} bool _shouldCloseCursor = false; + boost::optional<Document> _queuedInvalidate; }; DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { @@ -212,6 +217,11 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated"); } + if (_queuedInvalidate) { + _shouldCloseCursor = true; + return DocumentSource::GetNextResult(std::move(_queuedInvalidate.get())); + } + auto nextInput = pSource->getNext(); if (!nextInput.isAdvanced()) return nextInput; @@ -228,6 +238,22 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() { _shouldCloseCursor = true; } + // Check if this is an invalidating command and the next entry should be an "invalidate". + // TODO SERVER-35029: For whole-db change streams, only a database drop will invalidate the + // stream. + const auto invalidatingCommand = pExpCtx->isSingleNamespaceAggregation() + ? (operationType == DocumentSourceChangeStream::kDropCollectionOpType || + operationType == DocumentSourceChangeStream::kRenameCollectionOpType) + : false; + + if (invalidatingCommand) { + _queuedInvalidate = Document{ + {DocumentSourceChangeStream::kIdField, doc[DocumentSourceChangeStream::kIdField]}, + {DocumentSourceChangeStream::kClusterTimeField, + doc[DocumentSourceChangeStream::kClusterTimeField]}, + {DocumentSourceChangeStream::kOperationTypeField, "invalidate"_sd}}; + } + return nextInput; } @@ -304,27 +330,28 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( ChangeStreamType sourceType = getChangeStreamType(nss); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. - BSONArrayBuilder invalidatingCommands; - invalidatingCommands.append(BSON("o.dropDatabase" << 1)); + BSONArrayBuilder relevantCommands; if (sourceType == ChangeStreamType::kSingleCollection) { - invalidatingCommands.append(BSON("o.drop" << nss.coll())); - invalidatingCommands.append(BSON("o.renameCollection" << nss.ns())); + relevantCommands.append(BSON("o.drop" << nss.coll())); + // Generate 'rename' entries if the change stream is open on the source or target namespace. + relevantCommands.append(BSON("o.renameCollection" << nss.ns())); + relevantCommands.append(BSON("o.to" << nss.ns())); if (expCtx->collation.isEmpty()) { // If the user did not specify a collation, they should be using the collection's // default collation. So a "create" command which has any collation present would // invalidate the change stream, since that must mean the stream was created before the // collection existed and used the simple collation, which is no longer the default. - invalidatingCommands.append( + relevantCommands.append( BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true))); } } else { - // For change streams on an entire database, the stream is invalidated if any collections in - // that database are dropped or renamed. For cluster-wide streams, drops or renames of any - // collection in any database (aside from the internal databases admin, config and local) - // will invalidate the stream. - invalidatingCommands.append(BSON("o.drop" << BSON("$exists" << true))); - invalidatingCommands.append(BSON("o.renameCollection" << BSON("$exists" << true))); + // For change streams on an entire database, include notifications for individual collection + // drops and renames which will not invalidate the stream. Also include the 'dropDatabase' + // command which will invalidate the stream. + relevantCommands.append(BSON("o.drop" << BSON("$exists" << true))); + relevantCommands.append(BSON("o.dropDatabase" << BSON("$exists" << true))); + relevantCommands.append(BSON("o.renameCollection" << BSON("$exists" << true))); } // For cluster-wide $changeStream, match the command namespace of any database other than admin, @@ -333,9 +360,9 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( ? BSON("ns" << BSONRegEx("^" + kRegexAllDBs + kRegexCmdColl)) : BSON("ns" << nss.getCommandNS().ns())); - // 1.1) Commands that are on target db(s) and one of the above invalidating commands. + // 1.1) Commands that are on target db(s) and one of the above supported commands. auto commandsOnTargetDb = - BSON("$and" << BSON_ARRAY(cmdNsFilter << BSON("$or" << invalidatingCommands.arr()))); + BSON("$and" << BSON_ARRAY(cmdNsFilter << BSON("$or" << relevantCommands.arr()))); // 1.2) Supported commands that have arbitrary db namespaces in "ns" field. auto renameDropTarget = (sourceType == ChangeStreamType::kAllChangesForCluster diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 0912e79ebd7..cf1720b7d36 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -135,11 +135,17 @@ public: static constexpr StringData kTxnNumberField = "txnNumber"_sd; static constexpr StringData kLsidField = "lsid"_sd; + // The target namespace of a rename operation. + static constexpr StringData kRenameTargetNssField = "to"_sd; + // The different types of operations we can use for the operation type. static constexpr StringData kUpdateOpType = "update"_sd; static constexpr StringData kDeleteOpType = "delete"_sd; static constexpr StringData kReplaceOpType = "replace"_sd; static constexpr StringData kInsertOpType = "insert"_sd; + static constexpr StringData kDropCollectionOpType = "drop"_sd; + static constexpr StringData kRenameCollectionOpType = "rename"_sd; + static constexpr StringData kDropDatabaseOpType = "dropDatabase"_sd; static constexpr StringData kInvalidateOpType = "invalidate"_sd; // Internal op type to signal mongos to open cursors on new shards. static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd; diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index af12c25c7d8..f465a5994ef 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -111,25 +111,27 @@ public: void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc, - std::vector<FieldPath> docKeyFields, - const BSONObj& spec) { + std::vector<FieldPath> docKeyFields = {}, + const BSONObj& spec = kDefaultSpec, + const boost::optional<Document> expectedInvalidate = {}) { vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec); - auto transform = stages[2].get(); + auto closeCursor = stages.back(); getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(docKeyFields); - auto next = transform->getNext(); + auto next = closeCursor->getNext(); // Match stage should pass the doc down if expectedDoc is given. ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc)); if (expectedDoc) { ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedDoc); } - } - void checkTransformation(const OplogEntry& entry, - const boost::optional<Document> expectedDoc, - std::vector<FieldPath> docKeyFields = {}) { - return checkTransformation(entry, expectedDoc, docKeyFields, kDefaultSpec); + if (expectedInvalidate) { + next = closeCursor->getNext(); + ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedInvalidate); + // Then throw an exception on the next call of getNext(). + ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>); + } } /** @@ -623,32 +625,44 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrate) { checkTransformation(deleteEntry, boost::none); } -TEST_F(ChangeStreamStageTest, TransformInvalidate) { - NamespaceString otherColl("test.bar"); - +TEST_F(ChangeStreamStageTest, TransformDrop) { OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid()); - bool dropDBFromMigrate = false; // verify this doesn't get it filtered - OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, dropDBFromMigrate); - OplogEntry rename = - createCommand(BSON("renameCollection" << nss.ns() << "to" << otherColl.ns()), testUuid()); - // Invalidate entry doesn't have a document id. + Document expectedDrop{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + }; Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, }; - for (auto& entry : {dropColl, rename}) { - checkTransformation(entry, expectedInvalidate); - } - // Drop database invalidate entry doesn't have a UUID. - Document expectedInvalidateDropDatabase{ - {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + checkTransformation(dropColl, expectedDrop, {}, kDefaultSpec, expectedInvalidate); +} + +TEST_F(ChangeStreamStageTest, TransformRename) { + NamespaceString otherColl("test.bar"); + OplogEntry rename = + createCommand(BSON("renameCollection" << nss.ns() << "to" << otherColl.ns()), testUuid()); + + Document expectedRename{ + {DSChangeStream::kRenameTargetNssField, + D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; - checkTransformation(dropDB, expectedInvalidateDropDatabase); + Document expectedInvalidate{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + }; + + checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate); } TEST_F(ChangeStreamStageTest, TransformInvalidateFromMigrate) { @@ -670,22 +684,25 @@ TEST_F(ChangeStreamStageTest, TransformInvalidateFromMigrate) { } } -TEST_F(ChangeStreamStageTest, TransformInvalidateRenameDropTarget) { +TEST_F(ChangeStreamStageTest, TransformRenameTarget) { NamespaceString otherColl("test.bar"); - auto rename = - makeOplogEntry(OpTypeEnum::kCommand, // op type - otherColl.getCommandNS(), // namespace - BSON("renameCollection" << otherColl.ns() << "to" << nss.ns()), // o - testUuid(), // uuid - boost::none, // fromMigrate - boost::none); // o2 + OplogEntry rename = + createCommand(BSON("renameCollection" << otherColl.ns() << "to" << nss.ns()), testUuid()); + Document expectedRename{ + {DSChangeStream::kRenameTargetNssField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, + }; Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, }; - checkTransformation(rename, expectedInvalidate); + + checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate); } TEST_F(ChangeStreamStageTest, TransformNewShardDetected) { @@ -882,17 +899,35 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) { }; checkTransformation(updateField, expectedUpdateField); - // Test the 'clusterTime' field is copied from the oplog entry for an invalidation. + // Test the 'clusterTime' field is copied from the oplog entry for a collection drop. OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid(), boost::none, opTime); - // Invalidate entry doesn't have a document id. - Document expectedInvalidate{ + Document expectedDrop{ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + }; + checkTransformation(dropColl, expectedDrop); + + // Test the 'clusterTime' field is copied from the oplog entry for a collection rename. + NamespaceString otherColl("test.bar"); + OplogEntry rename = + createCommand(BSON("renameCollection" << nss.ns() << "to" << otherColl.ns()), + testUuid(), + boost::none, + opTime); + + Document expectedRename{ + {DSChangeStream::kRenameTargetNssField, + D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, + {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, + {DSChangeStream::kClusterTimeField, ts}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; - checkTransformation(dropColl, expectedInvalidate); + checkTransformation(rename, expectedRename); } TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) { @@ -979,13 +1014,22 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) { auto stages = makeStages(dropColl); auto closeCursor = stages.back(); + Document expectedDrop{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + }; Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, }; auto next = closeCursor->getNext(); + // Transform into drop entry. + ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedDrop); + next = closeCursor->getNext(); // Transform into invalidate entry. ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedInvalidate); // Then throw an exception on the next call of getNext(). @@ -1292,25 +1336,38 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrate) { checkTransformation(deleteEntry, boost::none); } -TEST_F(ChangeStreamStageDBTest, TransformInvalidate) { - NamespaceString otherColl("test.bar"); - +TEST_F(ChangeStreamStageDBTest, TransformDrop) { OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid()); - bool dropDBFromMigrate = false; // verify this doesn't get it filtered - OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, dropDBFromMigrate); + Document expectedDrop{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + }; + checkTransformation(dropColl, expectedDrop); +} + +TEST_F(ChangeStreamStageDBTest, TransformRename) { + NamespaceString otherColl("test.bar"); OplogEntry rename = createCommand(BSON("renameCollection" << nss.ns() << "to" << otherColl.ns()), testUuid()); - // Invalidate entry doesn't have a document id. - Document expectedInvalidate{ + Document expectedRename{ + {DSChangeStream::kRenameTargetNssField, + D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}}, {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, }; - for (auto& entry : {dropColl, rename}) { - checkTransformation(entry, expectedInvalidate); - } + checkTransformation(rename, expectedRename); +} + +TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) { + bool dropDBFromMigrate = false; // verify this doesn't get it filtered + OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, dropDBFromMigrate); + // TODO SERVER-35029: Add notification for DB drop followed by invalidate. // Drop database invalidate entry doesn't have a UUID. Document expectedInvalidateDropDatabase{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)}, @@ -1320,21 +1377,35 @@ TEST_F(ChangeStreamStageDBTest, TransformInvalidate) { checkTransformation(dropDB, expectedInvalidateDropDatabase); } -TEST_F(ChangeStreamStageDBTest, SystemCollectionsDropOrRenameShouldInvalidate) { +TEST_F(ChangeStreamStageDBTest, SystemCollectionsDrop) { NamespaceString systemColl(nss.db() + ".system.users"); - NamespaceString renamedSystemColl(nss.db() + ".system.users_new"); - Document expectedInvalidate{ + OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid()); + // Note that the collection drop does *not* have the queued invalidated field. + Document expectedDrop{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, - {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}}, }; + checkTransformation(dropColl, expectedDrop); +} - OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid()); - checkTransformation(dropColl, expectedInvalidate); +TEST_F(ChangeStreamStageDBTest, SystemCollectionsRename) { + NamespaceString systemColl(nss.db() + ".system.users"); + NamespaceString renamedSystemColl(nss.db() + ".system.users_new"); OplogEntry rename = createCommand( BSON("renameCollection" << systemColl.ns() << "to" << renamedSystemColl.ns()), testUuid()); - checkTransformation(rename, expectedInvalidate); + // Note that the collection rename does *not* have the queued invalidated field. + Document expectedRename{ + {DSChangeStream::kRenameTargetNssField, + D{{"db", renamedSystemColl.db()}, {"coll", renamedSystemColl.coll()}}}, + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + {DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}}, + }; + checkTransformation(rename, expectedRename); } TEST_F(ChangeStreamStageDBTest, MatchFiltersNoOp) { diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 9e7e623eeab..8f25e7e6151 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -292,10 +292,28 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document invariant(nextDoc); return applyTransformation(*nextDoc); + } else if (!input.getNestedField("o.drop").missing()) { + operationType = DocumentSourceChangeStream::kDropCollectionOpType; + + // The "o.drop" field will contain the actual collection name. + nss = NamespaceString(nss.db(), input.getNestedField("o.drop").getString()); + } else if (!input.getNestedField("o.renameCollection").missing()) { + operationType = DocumentSourceChangeStream::kRenameCollectionOpType; + + // The "o.renameCollection" field contains the namespace of the original collection. + nss = NamespaceString(input.getNestedField("o.renameCollection").getString()); + + // The "o.to" field contains the target namespace for the rename. + const auto renameTargetNss = + NamespaceString(input.getNestedField("o.to").getString()); + doc.addField(DocumentSourceChangeStream::kRenameTargetNssField, + Value(Document{{"db", renameTargetNss.db()}, + {"coll", renameTargetNss.coll()}})); + } else { + // All other commands will invalidate the stream. + operationType = DocumentSourceChangeStream::kInvalidateOpType; } - // Any command that makes it through our filter is an invalidating command such as a - // drop. - operationType = DocumentSourceChangeStream::kInvalidateOpType; + // Make sure the result doesn't have a document key. documentKey = Value(); break; |