summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml8
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml5
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml10
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml10
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_11.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth_15.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth_misc.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_ese_16.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_ese_misc.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_misc.yml2
-rw-r--r--jstests/change_streams/apply_ops.js (renamed from jstests/change_streams/change_stream_apply_ops.js)29
-rw-r--r--jstests/change_streams/apply_ops_resumability.js (renamed from jstests/change_streams/change_stream_apply_ops_resumability.js)39
-rw-r--r--jstests/change_streams/ban_from_lookup.js (renamed from jstests/change_streams/change_stream_ban_from_lookup.js)0
-rw-r--r--jstests/change_streams/ban_from_views.js (renamed from jstests/change_streams/change_stream_ban_from_views.js)0
-rw-r--r--jstests/change_streams/change_stream.js11
-rw-r--r--jstests/change_streams/change_stream_invalidation.js128
-rw-r--r--jstests/change_streams/change_stream_rename_resumability.js51
-rw-r--r--jstests/change_streams/change_stream_rename_target.js48
-rw-r--r--jstests/change_streams/change_stream_whole_cluster_invalidations.js134
-rw-r--r--jstests/change_streams/change_stream_whole_db_invalidations.js125
-rw-r--r--jstests/change_streams/collation.js (renamed from jstests/change_streams/change_stream_collation.js)6
-rw-r--r--jstests/change_streams/does_not_implicitly_create_database.js (renamed from jstests/change_streams/change_stream_does_not_implicitly_create_database.js)0
-rw-r--r--jstests/change_streams/include_cluster_time.js5
-rw-r--r--jstests/change_streams/lookup_post_image.js10
-rw-r--r--jstests/change_streams/metadata_notifications.js201
-rw-r--r--jstests/change_streams/shell_helper.js (renamed from jstests/change_streams/change_stream_shell_helper.js)11
-rw-r--r--jstests/change_streams/start_at_cluster_time.js (renamed from jstests/change_streams/change_stream_start_at_cluster_time.js)0
-rw-r--r--jstests/change_streams/whitelist.js (renamed from jstests/change_streams/change_stream_whitelist.js)0
-rw-r--r--jstests/change_streams/whole_cluster.js (renamed from jstests/change_streams/change_stream_whole_cluster.js)16
-rw-r--r--jstests/change_streams/whole_cluster_metadata_notifications.js225
-rw-r--r--jstests/change_streams/whole_cluster_resumability.js (renamed from jstests/change_streams/change_stream_whole_cluster_resumability.js)5
-rw-r--r--jstests/change_streams/whole_db.js (renamed from jstests/change_streams/change_stream_whole_db.js)22
-rw-r--r--jstests/change_streams/whole_db_metadata_notifications.js183
-rw-r--r--jstests/change_streams/whole_db_resumability.js (renamed from jstests/change_streams/change_stream_whole_db_resumability.js)0
-rw-r--r--jstests/libs/change_stream_util.js32
-rw-r--r--jstests/libs/override_methods/implicit_whole_cluster_changestreams.js6
-rw-r--r--jstests/libs/override_methods/implicit_whole_db_changestreams.js7
-rw-r--r--jstests/sharding/change_stream_metadata_notifications.js (renamed from jstests/sharding/change_stream_invalidation.js)40
-rw-r--r--jstests/sharding/change_streams.js16
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp53
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp191
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp24
54 files changed, 1042 insertions, 660 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
index fd05367ac35..ec41092246c 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_sessions_passthrough.yml
@@ -7,10 +7,6 @@ selector:
# This test exercises an internal detail of mongos<->mongod communication and is not expected to
# work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data.
- - jstests/change_streams/change_stream_shell_helper.js
- - jstests/change_streams/include_cluster_time.js
- - jstests/change_streams/lookup_post_image.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
index 17992685444..5664164c149 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
@@ -6,9 +6,11 @@ selector:
exclude_files:
# Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # Renames a collection, which is not allowed for sharded collections.
- - jstests/change_streams/change_stream_rename_target.js
- - jstests/change_streams/change_stream_rename_resumability.js
+ # TODO SERVER-35280: Update change stream tests to handle drops of sharded collections and databases.
+ - jstests/change_streams/whole_cluster.js
+ - jstests/change_streams/whole_cluster_metadata_notifications.js
+ - jstests/change_streams/whole_db.js
+ - jstests/change_streams/whole_db_metadata_notifications.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
index 086f96021c3..740ad95685a 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_mongos_passthrough.yml
@@ -8,9 +8,12 @@ selector:
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
# TODO: SERVER-32088 should fix resuming a change stream when not all shards have data.
- - jstests/change_streams/change_stream_shell_helper.js
+ - jstests/change_streams/shell_helper.js
- jstests/change_streams/lookup_post_image.js
- jstests/change_streams/include_cluster_time.js
+ # Not relevant for whole-cluster change streams.
+ - jstests/change_streams/metadata_notifications.js
+ - jstests/change_streams/whole_db_metadata_notifications.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml
index 0ed49f26ae2..b5d3e8688af 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_passthrough.yml
@@ -3,6 +3,10 @@ test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
+ exclude_files:
+ # Not relevant for whole-cluster change streams.
+ - jstests/change_streams/metadata_notifications.js
+ - jstests/change_streams/whole_db_metadata_notifications.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
index 651fad01bed..78fc224c071 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_secondary_reads_passthrough.yml
@@ -9,6 +9,9 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # Not relevant for whole-cluster change streams.
+ - jstests/change_streams/metadata_notifications.js
+ - jstests/change_streams/whole_db_metadata_notifications.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
index c6c10ecb937..a60da2f3d0e 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_cluster_sharded_collections_passthrough.yml
@@ -6,9 +6,13 @@ selector:
exclude_files:
# Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # These tests rename a collection, which is not allowed for sharded collections.
- - jstests/change_streams/change_stream_rename_target.js
- - jstests/change_streams/change_stream_rename_resumability.js
+ # Not relevant for whole-cluster change streams.
+ - jstests/change_streams/metadata_notifications.js
+ - jstests/change_streams/whole_db_metadata_notifications.js
+ # TODO SERVER-35280: Update change stream tests to handle drops of sharded collections and databases.
+ - jstests/change_streams/whole_cluster.js
+ - jstests/change_streams/whole_cluster_metadata_notifications.js
+ - jstests/change_streams/whole_db.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
index d2452f5c739..0c08e630f4c 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_mongos_passthrough.yml
@@ -8,9 +8,11 @@ selector:
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
# TODO: SERVER-32088 should fix resuming a change stream when not all shards have data.
- - jstests/change_streams/change_stream_shell_helper.js
+ - jstests/change_streams/shell_helper.js
- jstests/change_streams/lookup_post_image.js
- jstests/change_streams/include_cluster_time.js
+ # Only relevant for single-collection change streams.
+ - jstests/change_streams/metadata_notifications.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml
index 42233a8d81b..639179c90b9 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_passthrough.yml
@@ -3,6 +3,9 @@ test_kind: js_test
selector:
roots:
- jstests/change_streams/**/*.js
+ exclude_files:
+ # Only relevant for single-collection change streams.
+ - jstests/change_streams/metadata_notifications.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
index 96332cfe463..838dd2ffce7 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_secondary_reads_passthrough.yml
@@ -9,6 +9,9 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # Only relevant for single-collection change streams.
+ - jstests/change_streams/metadata_notifications.js
+
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
index d5b1d538c71..06a30be4ee6 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_whole_db_sharded_collections_passthrough.yml
@@ -6,9 +6,13 @@ selector:
exclude_files:
# Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
- # These tests rename a collection, which is not allowed for sharded collections.
- - jstests/change_streams/change_stream_rename_target.js
- - jstests/change_streams/change_stream_rename_resumability.js
+ # Only relevant for single-collection change streams.
+ - jstests/change_streams/metadata_notifications.js
+ # TODO SERVER-35280: Update change stream tests to handle drops of sharded collections and databases.
+ - jstests/change_streams/whole_cluster.js
+ - jstests/change_streams/whole_cluster_metadata_notifications.js
+ - jstests/change_streams/whole_db.js
+ - jstests/change_streams/whole_db_metadata_notifications.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/sharding_11.yml b/buildscripts/resmokeconfig/suites/sharding_11.yml
index f13cca6d959..e49a191d72c 100644
--- a/buildscripts/resmokeconfig/suites/sharding_11.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_11.yml
@@ -36,7 +36,7 @@ selector:
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js
- jstests/sharding/sharded_limit_batchsize.js
- jstests/sharding/migration_sets_fromMigrate_flag.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/bulk_insert.js
- jstests/sharding/multi_mongos2a.js
- jstests/sharding/movechunk_commit_changelog_stats.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_15.yml b/buildscripts/resmokeconfig/suites/sharding_auth_15.yml
index 257edd3ebdd..3053ef9e52a 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth_15.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth_15.yml
@@ -24,7 +24,7 @@ selector:
- jstests/sharding/change_streams_establishment_finds_new_shards.js
- jstests/sharding/inserts_consistent.js
- jstests/sharding/not_allowed_on_sharded_collection_cmd.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/mapReduce_outSharded.js
- jstests/sharding/autosplit.js
- jstests/sharding/accurate_count_with_predicate.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml
index 0bfc47532e5..fc092eb55f2 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit_14.yml
@@ -35,7 +35,7 @@ selector:
- jstests/sharding/change_streams_establishment_finds_new_shards.js
- jstests/sharding/inserts_consistent.js
- jstests/sharding/not_allowed_on_sharded_collection_cmd.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/autosplit.js
- jstests/sharding/tag_auto_split.js
- jstests/sharding/covered_shard_key_indexes.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml
index afa10efa647..18a72f302d7 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit_misc.yml
@@ -247,7 +247,7 @@ selector:
- jstests/sharding/change_streams_establishment_finds_new_shards.js
- jstests/sharding/inserts_consistent.js
- jstests/sharding/not_allowed_on_sharded_collection_cmd.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/autosplit.js
- jstests/sharding/tag_auto_split.js
- jstests/sharding/covered_shard_key_indexes.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_misc.yml b/buildscripts/resmokeconfig/suites/sharding_auth_misc.yml
index c1e591d81d1..cec1ad5dfb9 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth_misc.yml
@@ -245,7 +245,7 @@ selector:
- jstests/sharding/change_streams_establishment_finds_new_shards.js
- jstests/sharding/inserts_consistent.js
- jstests/sharding/not_allowed_on_sharded_collection_cmd.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/mapReduce_outSharded.js
- jstests/sharding/autosplit.js
- jstests/sharding/accurate_count_with_predicate.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_ese_16.yml b/buildscripts/resmokeconfig/suites/sharding_ese_16.yml
index f43990f5304..1d07c54874c 100644
--- a/buildscripts/resmokeconfig/suites/sharding_ese_16.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_ese_16.yml
@@ -27,7 +27,7 @@ selector:
- jstests/sharding/accurate_count_with_predicate.js
- jstests/sharding/migration_sets_fromMigrate_flag.js
- jstests/sharding/remove3.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/autosplit.js
- jstests/sharding/basic_merge.js
- jstests/sharding/merge_chunks_compound_shard_key.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_ese_misc.yml b/buildscripts/resmokeconfig/suites/sharding_ese_misc.yml
index f8bcc258b22..1005ad7ff90 100644
--- a/buildscripts/resmokeconfig/suites/sharding_ese_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_ese_misc.yml
@@ -242,7 +242,7 @@ selector:
- jstests/sharding/accurate_count_with_predicate.js
- jstests/sharding/migration_sets_fromMigrate_flag.js
- jstests/sharding/remove3.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/autosplit.js
- jstests/sharding/basic_merge.js
- jstests/sharding/merge_chunks_compound_shard_key.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index bc1bf39b26a..467108eaace 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -60,8 +60,8 @@ selector:
# New 4.0 feature
- jstests/sharding/snapshot_aggregate_mongos.js
- jstests/sharding/snapshot_find_mongos.js
- - jstests/sharding/change_stream_invalidation.js
- jstests/sharding/change_stream_lookup_single_shard_cluster.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/change_streams.js
- jstests/sharding/change_streams_primary_shard_unaware.js
- jstests/sharding/change_streams_unsharded_becomes_sharded.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml
index 0471b1321b6..d9732db7a6b 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_8.yml
@@ -64,4 +64,4 @@ executor:
mongosBinVersion: 'last-stable'
shardMixedBinVersions: true
skipCheckingUUIDsConsistentAcrossCluster: true
- nodb: '' \ No newline at end of file
+ nodb: ''
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index bccc925cf99..b17f585db15 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -68,8 +68,8 @@ selector:
# New 4.0 feature
- jstests/sharding/snapshot_aggregate_mongos.js
- jstests/sharding/snapshot_find_mongos.js
- - jstests/sharding/change_stream_invalidation.js
- jstests/sharding/change_stream_lookup_single_shard_cluster.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/change_streams.js
- jstests/sharding/change_streams_primary_shard_unaware.js
- jstests/sharding/change_streams_unsharded_becomes_sharded.js
@@ -235,7 +235,7 @@ selector:
- jstests/sharding/addshard5.js
- jstests/sharding/count2.js
- jstests/sharding/features1.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/regex_targeting.js
- jstests/sharding/shard6.js
- jstests/sharding/query_config.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_misc.yml b/buildscripts/resmokeconfig/suites/sharding_misc.yml
index 55ee8cba885..3a4374bdbd0 100644
--- a/buildscripts/resmokeconfig/suites/sharding_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_misc.yml
@@ -216,7 +216,7 @@ selector:
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js
- jstests/sharding/sharded_limit_batchsize.js
- jstests/sharding/migration_sets_fromMigrate_flag.js
- - jstests/sharding/change_stream_invalidation.js
+ - jstests/sharding/change_stream_metadata_notifications.js
- jstests/sharding/bulk_insert.js
- jstests/sharding/multi_mongos2a.js
- jstests/sharding/movechunk_commit_changelog_stats.js
diff --git a/jstests/change_streams/change_stream_apply_ops.js b/jstests/change_streams/apply_ops.js
index 0f954578e52..ee116d3495d 100644
--- a/jstests/change_streams/change_stream_apply_ops.js
+++ b/jstests/change_streams/apply_ops.js
@@ -20,8 +20,12 @@
coll.insert({_id: kDeletedDocumentId, a: "I was here before the transaction"});
let cst = new ChangeStreamTest(db);
- let changeStream = cst.startWatchingChanges(
- {pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}], collection: coll});
+ let changeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {"lsid.uid": 0}}],
+ collection: coll,
+ doNotModifyInPassthroughs:
+ true // A collection drop only invalidates single-collection change streams.
+ });
const sessionOptions = {causalConsistency: false};
const session = db.getMongo().startSession(sessionOptions);
@@ -93,12 +97,21 @@
lsid: session.getSessionId(),
txnNumber: NumberLong(session._txnNumber),
},
- {operationType: "invalidate"},
+ {
+ operationType: "drop",
+ ns: {db: db.getName(), coll: coll.getName()},
+ },
];
// Verify that the stream returns the expected sequence of changes.
- const changes = cst.assertNextChangesEqual(
- {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true});
+ const changes =
+ cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
+ // Single collection change stream should also be invalidated by the drop.
+ cst.assertNextChangesEqual({
+ cursor: changeStream,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
// Obtain the clusterTime from the first change.
const startTime = changes[0].clusterTime;
@@ -119,8 +132,7 @@
pipeline: [{$changeStream: {startAtOperationTime: startTime}}, {$project: {"lsid.uid": 0}}],
collection: 1
});
- cst.assertNextChangesEqual(
- {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true});
+ cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
// Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
expectedChanges.splice(3, 0, {
@@ -142,8 +154,7 @@
],
collection: 1
});
- cst.assertNextChangesEqual(
- {cursor: changeStream, expectedChanges: expectedChanges, expectInvalidate: true});
+ cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
cst.cleanUp();
}());
diff --git a/jstests/change_streams/change_stream_apply_ops_resumability.js b/jstests/change_streams/apply_ops_resumability.js
index 8d7ee90b8e6..a8d25d775d4 100644
--- a/jstests/change_streams/change_stream_apply_ops_resumability.js
+++ b/jstests/change_streams/apply_ops_resumability.js
@@ -88,6 +88,10 @@
},
];
+ //
+ // Test behavior of single-collection change streams with apply ops.
+ //
+
// Verify that the stream returns the expected sequence of changes.
const changes =
cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges});
@@ -115,13 +119,28 @@
let otherCursor = cst.startWatchingChanges({
pipeline:
[{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}],
- collection: coll
+ collection: coll,
+ doNotModifyInPassthroughs: true // A collection drop only invalidates single-collection
+ // change streams.
});
cst.assertNextChangesEqual({cursor: otherCursor, expectedChanges: expectedChanges.slice(3)});
- // Drop the collection. This will trigger an "invalidate" event at the end of the stream.
+ // Drop the collection. This will trigger a "drop" followed by an "invalidate" for the single
+ // collection change stream.
assert.commandWorked(db.runCommand({drop: coll.getName()}));
- expectedChanges.push({operationType: "invalidate"});
+ let change = cst.getOneChange(otherCursor);
+ assert.eq(change.operationType, "drop");
+ assert.eq(change.ns, {db: db.getName(), coll: coll.getName()});
+ change = cst.getOneChange(otherCursor, true);
+ assert.eq(change.operationType, "invalidate");
+
+ //
+ // Test behavior of whole-db change streams with apply ops.
+ //
+
+ // For a whole-db or whole-cluster change stream, the collection drop should return a single
+ // "drop" entry and not invalidate the stream.
+ expectedChanges.push({operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}});
// Add an entry for the insert on db.otherColl into expectedChanges.
expectedChanges.splice(3, 0, {
@@ -139,10 +158,9 @@
changeStream = cst.startWatchingChanges({
pipeline:
[{$changeStream: {resumeAfter: secondTxnChange._id}}, {$project: {"lsid.uid": 0}}],
- collection: 1
+ collection: 1,
});
- cst.assertNextChangesEqual(
- {cursor: changeStream, expectedChanges: expectedChanges.slice(3), expectInvalidate: true});
+ cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(3)});
// Add an entry for the insert on otherDb.otherDbColl into expectedChanges.
expectedChanges.splice(4, 0, {
@@ -154,9 +172,9 @@
txnNumber: NumberLong(session._txnNumber),
});
- // Verify that a whole-db stream can be resumed from the middle of the transaction, and that it
- // will see all subsequent changes including the insert on the other collection and the changes
- // on the other DB.
+ // Verify that a whole-cluster stream can be resumed from the middle of the transaction, and
+ // that it will see all subsequent changes including the insert on the other collection and the
+ // changes on the other DB.
cst = new ChangeStreamTest(db.getSiblingDB("admin"));
changeStream = cst.startWatchingChanges({
pipeline: [
@@ -165,8 +183,7 @@
],
collection: 1
});
- cst.assertNextChangesEqual(
- {cursor: changeStream, expectedChanges: expectedChanges.slice(3), expectInvalidate: true});
+ cst.assertNextChangesEqual({cursor: changeStream, expectedChanges: expectedChanges.slice(3)});
cst.cleanUp();
}());
diff --git a/jstests/change_streams/change_stream_ban_from_lookup.js b/jstests/change_streams/ban_from_lookup.js
index 08bc600ad55..08bc600ad55 100644
--- a/jstests/change_streams/change_stream_ban_from_lookup.js
+++ b/jstests/change_streams/ban_from_lookup.js
diff --git a/jstests/change_streams/change_stream_ban_from_views.js b/jstests/change_streams/ban_from_views.js
index c06932e55b3..c06932e55b3 100644
--- a/jstests/change_streams/change_stream_ban_from_views.js
+++ b/jstests/change_streams/ban_from_views.js
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 0d392b99bcc..7c5688b0704 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -160,17 +160,6 @@
assertDropCollection(db, db.dropping.getName());
// Should still see the previous change from t2, shouldn't see anything about 'dropping'.
- // Test collection renaming. Sharded collections cannot be renamed.
- if (!FixtureHelpers.isSharded(db.t2)) {
- jsTestLog("Testing rename");
- assertDropCollection(db, "t3");
- t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
- assert.writeOK(db.t2.renameCollection("t3"));
- expected = {operationType: "invalidate"};
- cst.assertNextChangesEqual(
- {cursor: t2cursor, expectedChanges: [expected], expectInvalidate: true});
- }
-
jsTestLog("Testing insert that looks like rename");
assertDropCollection(db, "dne1");
assertDropCollection(db, "dne2");
diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js
deleted file mode 100644
index e955af3af87..00000000000
--- a/jstests/change_streams/change_stream_invalidation.js
+++ /dev/null
@@ -1,128 +0,0 @@
-// Tests of $changeStream invalidate entries.
-
-(function() {
- "use strict";
-
- load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
-
- db.getMongo().forceReadMode('commands');
-
- // Write a document to the collection and test that the change stream returns it
- // and getMore command closes the cursor afterwards.
- const collGetMore = assertDropAndRecreateCollection(db, "change_stream_getmore_invalidations");
- // We awaited the replication of the first write, so the change stream shouldn't return it.
- assert.writeOK(collGetMore.insert({_id: 0, a: 1}));
-
- let changeStream = collGetMore.watch();
-
- // Drop the collection and test that we return "invalidate" entry and close the cursor. However,
- // we return all oplog entries preceding the drop.
- jsTestLog("Testing getMore command closes cursor for invalidate entries");
- // Create oplog entries of type insert, update, and delete.
- assert.writeOK(collGetMore.insert({_id: 1}));
- assert.writeOK(collGetMore.update({_id: 1}, {$set: {a: 1}}));
- assert.writeOK(collGetMore.remove({_id: 1}));
- // Drop the collection.
- assert.commandWorked(db.runCommand({drop: collGetMore.getName()}));
-
- // We should get 4 oplog entries of type insert, update, delete, and invalidate. The cursor
- // should be closed.
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "insert");
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "update");
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "delete");
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
- assert(changeStream.isExhausted());
-
- jsTestLog("Testing aggregate command closes cursor for invalidate entries");
- const collAgg = assertDropAndRecreateCollection(db, "change_stream_agg_invalidations");
-
- // Get a valid resume token that the next aggregate command can use.
- changeStream = collAgg.watch();
-
- assert.writeOK(collAgg.insert({_id: 1}));
-
- assert.soon(() => changeStream.hasNext());
- let change = changeStream.next();
- assert.eq(change.operationType, "insert");
- assert.eq(change.documentKey, {_id: 1});
- const resumeToken = change._id;
-
- // Insert another document after storing the resume token.
- assert.writeOK(collAgg.insert({_id: 2}));
-
- assert.soon(() => changeStream.hasNext());
- change = changeStream.next();
- assert.eq(change.operationType, "insert");
- assert.eq(change.documentKey, {_id: 2});
-
- // Drop the collection and invalidate the change stream.
- assertDropCollection(db, collAgg.getName());
- // Wait for two-phase drop to complete, so that the UUID no longer exists.
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collAgg.getName());
- });
-
- // Resume the change stream after the collection drop, up to and including the invalidate. This
- // is allowed if an explicit collation is provided.
- changeStream = collAgg.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}});
-
- assert.soon(() => changeStream.hasNext());
- change = changeStream.next();
- assert.eq(change.operationType, "insert");
- assert.eq(change.documentKey, {_id: 2});
-
- assert.soon(() => changeStream.hasNext());
- change = changeStream.next();
- assert.eq(change.operationType, "invalidate");
- assert(changeStream.isExhausted());
-
- // Test that it is possible to open a new change stream cursor on a collection that does not
- // exist.
- jsTestLog("Testing aggregate command on nonexistent collection");
- const collDoesNotExistName = "change_stream_agg_invalidations_does_not_exist";
- assertDropCollection(db, collDoesNotExistName);
-
- // Cursor creation succeeds, but there are no results.
- const cursorObj = assert
- .commandWorked(db.runCommand({
- aggregate: collDoesNotExistName,
- pipeline: [{$changeStream: {}}],
- cursor: {batchSize: 1},
- }))
- .cursor;
-
- // We explicitly test getMore, to ensure that the getMore command for a non-existent collection
- // does not return an error.
- let getMoreResult =
- assert
- .commandWorked(db.runCommand(
- {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1}))
- .cursor;
- assert.neq(getMoreResult.id, 0);
- assert.eq(getMoreResult.nextBatch.length, 0, tojson(getMoreResult.nextBatch));
-
- // After collection creation, we see oplog entries for the collection.
- const collNowExists = assertCreateCollection(db, collDoesNotExistName);
- assert.writeOK(collNowExists.insert({_id: 0}));
-
- assert.soon(function() {
- getMoreResult =
- assert
- .commandWorked(db.runCommand(
- {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1}))
- .cursor;
- assert.neq(getMoreResult.id, 0);
- return getMoreResult.nextBatch.length > 0;
- }, "Timed out waiting for another result from getMore on non-existent collection.");
- assert.eq(getMoreResult.nextBatch.length, 1);
- assert.eq(getMoreResult.nextBatch[0].operationType, "insert");
- assert.eq(getMoreResult.nextBatch[0].documentKey, {_id: 0});
-
- assert.commandWorked(
- db.runCommand({killCursors: collDoesNotExistName, cursors: [getMoreResult.id]}));
-}());
diff --git a/jstests/change_streams/change_stream_rename_resumability.js b/jstests/change_streams/change_stream_rename_resumability.js
deleted file mode 100644
index 46a3b3575b0..00000000000
--- a/jstests/change_streams/change_stream_rename_resumability.js
+++ /dev/null
@@ -1,51 +0,0 @@
-// Tests resuming on a change stream that was invalidated due to rename.
-
-(function() {
- "use strict";
-
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
-
- let coll = assertDropAndRecreateCollection(db, "change_stream_invalidate_resumability");
-
- // Drop the collection we'll rename to _before_ starting the changeStream, so that we don't
- // get accidentally an invalidate when running on the whole DB or cluster.
- assertDropCollection(db, coll.getName() + "_renamed");
-
- const cursor = coll.watch();
- assert(!cursor.hasNext());
-
- // Create an 'insert' oplog entry.
- assert.writeOK(coll.insert({_id: 1}));
-
- assert.commandWorked(coll.renameCollection(coll.getName() + "_renamed"));
-
- // Update 'coll' to point to the renamed collection.
- coll = db[coll.getName() + "_renamed"];
-
- // Insert another document after the rename.
- assert.writeOK(coll.insert({_id: 2}));
-
- // We should get 2 oplog entries of type insert and invalidate.
- assert.soon(() => cursor.hasNext());
- let change = cursor.next();
- assert.eq(change.operationType, "insert", tojson(change));
- assert.docEq(change.fullDocument, {_id: 1});
-
- assert.soon(() => cursor.hasNext());
- change = cursor.next();
- assert.eq(change.operationType, "invalidate", tojson(change));
- assert(cursor.isExhausted());
-
- // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to
- // be sure that it doesn't crash the server, but the ability to resume a change stream after an
- // invalidate is a bug, not a feature.
-
- // Try resuming from the invalidate.
- assert.doesNotThrow(function() {
- const resumeCursor = coll.watch([], {resumeAfter: change._id});
- assert.soon(() => resumeCursor.hasNext());
- // Not checking the contents of the document returned, because we do not technically
- // support this behavior.
- resumeCursor.next();
- });
-}());
diff --git a/jstests/change_streams/change_stream_rename_target.js b/jstests/change_streams/change_stream_rename_target.js
deleted file mode 100644
index 92a1d0c7c64..00000000000
--- a/jstests/change_streams/change_stream_rename_target.js
+++ /dev/null
@@ -1,48 +0,0 @@
-// Tests that watching a collection which another collection is renamed _to_ causes an invalidate.
-(function() {
- "use strict";
-
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
-
- const testDB = db.getSiblingDB(jsTestName());
-
- // Write a document to the collection and test that the change stream returns it
- // and getMore command closes the cursor afterwards.
- const collName1 = "change_stream_rename_1";
- const collName2 = "change_stream_rename_2";
- let coll = assertDropAndRecreateCollection(testDB, collName1);
- assertDropCollection(testDB, collName2);
- assertDropCollection(testDB, collName2);
-
- // Watch the collection which doesn't exist yet.
- let aggCursor = testDB[collName2].watch();
-
- // Insert something to the collection we're _not_ watching.
- assert.writeOK(coll.insert({_id: 1}));
-
- assert.eq(aggCursor.hasNext(), false);
-
- // Now rename the collection TO the collection that's being watched. This should invalidate the
- // change stream.
- assert.commandWorked(coll.renameCollection(collName2));
- assert.soon(() => aggCursor.hasNext());
- let invalidate = aggCursor.next();
- assert.eq(invalidate.operationType, "invalidate");
- assert(aggCursor.isExhausted());
-
- // Do another insert.
- assert.writeOK(testDB[collName2].insert({_id: 2}));
-
- // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to
- // be sure that it doesn't crash the server, but the ability to resume a change stream after an
- // invalidate is a bug, not a feature.
-
- // Try resuming from the invalidate.
- assert.doesNotThrow(function() {
- let cursor = testDB[collName2].watch([], {resumeAfter: invalidate._id});
- assert.soon(() => cursor.hasNext());
- // Not checking the contents of the document returned, because we do not technically
- // support this behavior.
- cursor.next();
- });
-}());
diff --git a/jstests/change_streams/change_stream_whole_cluster_invalidations.js b/jstests/change_streams/change_stream_whole_cluster_invalidations.js
deleted file mode 100644
index d2c07cdbc81..00000000000
--- a/jstests/change_streams/change_stream_whole_cluster_invalidations.js
+++ /dev/null
@@ -1,134 +0,0 @@
-// Tests of invalidate entries for a $changeStream on a whole cluster.
-(function() {
- "use strict";
-
- load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
- load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-
- // Define two databases. We will conduct our tests by creating one collection in each.
- const testDB1 = db, testDB2 = db.getSiblingDB(`${db.getName()}_other`);
- const adminDB = db.getSiblingDB("admin");
-
- // Create one collection on each database.
- let [db1Coll, db2Coll] =
- [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName()));
-
- // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened
- // on admin.
- let cst = new ChangeStreamTest(adminDB);
- let aggCursor = cst.startWatchingAllChangesForCluster();
-
- // Generate oplog entries of type insert, update, and delete across both databases.
- for (let coll of[db1Coll, db2Coll]) {
- assert.writeOK(coll.insert({_id: 1}));
- assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
- assert.writeOK(coll.remove({_id: 1}));
- }
-
- // Drop the second database, which should invalidate the stream.
- assert.commandWorked(testDB2.dropDatabase());
-
- // We should get 7 oplog entries; three ops of type insert, update, delete from each database,
- // and then an invalidate. The cursor should be closed.
- for (let expectedDB of[testDB1, testDB2]) {
- let change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "update", tojson(change));
- assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "delete", tojson(change));
- assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
- }
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- // Test that a cluster-wide change stream can be resumed using a token from a collection which
- // has been dropped.
- db1Coll = assertDropAndRecreateCollection(testDB1, jsTestName());
-
- // Get a valid resume token that the next change stream can use.
- aggCursor = cst.startWatchingAllChangesForCluster();
-
- assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
-
- let change = cst.getOneChange(aggCursor, false);
- const resumeToken = change._id;
-
- // For cluster-wide streams, it is possible to resume at a point before a collection is dropped,
- // even if the invalidation has not been received on the original stream yet.
- assertDropCollection(db1Coll, db1Coll.getName());
- // Wait for two-phase drop to complete, so that the UUID no longer exists.
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1,
- db1Coll.getName());
- });
- assert.commandWorked(adminDB.runCommand({
- aggregate: 1,
- pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}],
- cursor: {}
- }));
-
- // Test that invalidation entries from any database invalidate the stream.
- [db1Coll, db2Coll] =
- [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName()));
- let _idForTest = 0;
- for (let collToInvalidate of[db1Coll, db2Coll]) {
- // Start watching all changes in the cluster.
- aggCursor = cst.startWatchingAllChangesForCluster();
-
- let testDB = collToInvalidate.getDB();
-
- // Insert into the collections on both databases, and verify the change stream is able to
- // pick them up.
- for (let collToWrite of[db1Coll, db2Coll]) {
- assert.writeOK(collToWrite.insert({_id: _idForTest}));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, _idForTest);
- assert.eq(change.ns.db, collToWrite.getDB().getName());
- _idForTest++;
- }
-
- // Renaming the collection should invalidate the change stream. Skip this test when running
- // on a sharded collection, since these cannot be renamed.
- if (!FixtureHelpers.isSharded(collToInvalidate)) {
- assert.writeOK(collToInvalidate.renameCollection("renamed_coll"));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
- collToInvalidate = testDB.getCollection("renamed_coll");
- }
-
- // Dropping a collection should invalidate the change stream.
- aggCursor = cst.startWatchingAllChangesForCluster();
- assertDropCollection(testDB, collToInvalidate.getName());
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- // Dropping a 'system' collection should invalidate the change stream.
- // Create a view to ensure that the 'system.views' collection exists.
- assert.commandWorked(
- testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
- aggCursor = cst.startWatchingAllChangesForCluster();
- assertDropCollection(testDB, "system.views");
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
- }
-
- cst.cleanUp();
-}());
diff --git a/jstests/change_streams/change_stream_whole_db_invalidations.js b/jstests/change_streams/change_stream_whole_db_invalidations.js
deleted file mode 100644
index 463359ec839..00000000000
--- a/jstests/change_streams/change_stream_whole_db_invalidations.js
+++ /dev/null
@@ -1,125 +0,0 @@
-// Tests of invalidate entries for a $changeStream on a whole database.
-(function() {
- "use strict";
-
- load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
- load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-
- const testDB = db.getSiblingDB(jsTestName());
- let cst = new ChangeStreamTest(testDB);
-
- // Write a document to the collection and test that the change stream returns it
- // and getMore command closes the cursor afterwards.
- let coll = assertDropAndRecreateCollection(testDB, "change_stream_whole_db_invalidations");
-
- let aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
-
- // Create oplog entries of type insert, update, and delete.
- assert.writeOK(coll.insert({_id: 1}));
- assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
- assert.writeOK(coll.remove({_id: 1}));
- // Drop the collection.
- assert.commandWorked(testDB.runCommand({drop: coll.getName()}));
- // We should get 4 oplog entries of type insert, update, delete, and invalidate. The cursor
- // should be closed.
- let change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "update", tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "delete", tojson(change));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- const collAgg =
- assertDropAndRecreateCollection(testDB, "change_stream_whole_db_agg_invalidations");
-
- // Get a valid resume token that the next change stream can use.
- aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
-
- assert.writeOK(collAgg.insert({_id: 1}, {writeConcern: {w: "majority"}}));
-
- change = cst.getOneChange(aggCursor, false);
- const resumeToken = change._id;
-
- // For whole-db streams, it is possible to resume at a point before a collection is dropped,
- // even if the invalidation has not been received on the original stream yet.
- assertDropCollection(testDB, collAgg.getName());
- // Wait for two-phase drop to complete, so that the UUID no longer exists.
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB,
- collAgg.getName());
- });
- assert.commandWorked(testDB.runCommand(
- {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}}));
-
- // Test that invalidation entries for other databases are filtered out.
- const otherDB = testDB.getSiblingDB("change_stream_whole_db_invalidations_other");
- const otherDBColl = otherDB["change_stream_whole_db_invalidations_other"];
- assert.writeOK(otherDBColl.insert({_id: 0}));
-
- // Create collection on the database being watched.
- coll = assertDropAndRecreateCollection(testDB, "change_stream_whole_db_invalidations");
-
- // Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be
- // upconverted to a cluster-wide stream, which *would* be invalidated by dropping the collection
- // in the other database.
- aggCursor = cst.startWatchingChanges(
- {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true});
-
- // Drop the collection on the other database, this should *not* invalidate the change stream.
- assertDropCollection(otherDB, otherDBColl.getName());
-
- // Insert into the collection in the watched database, and verify the change stream is able to
- // pick it up.
- assert.writeOK(coll.insert({_id: 1}));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 1);
-
- // Test that renaming a collection will invalidate the change stream. MongoDB does not allow
- // renaming of sharded collections, so only perform this test if the collection is not sharded.
- if (!FixtureHelpers.isSharded(coll)) {
- assertDropCollection(testDB, coll.getName());
-
- assertCreateCollection(testDB, coll.getName());
- assertDropCollection(testDB, "renamed_coll");
- aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- assert.writeOK(coll.renameCollection("renamed_coll"));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
- }
-
- // Dropping a collection should invalidate the change stream.
- assertDropCollection(testDB, coll.getName());
- aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- assertCreateCollection(testDB, coll.getName());
- assertDropCollection(testDB, coll.getName());
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- // Dropping a 'system' collection should invalidate the change stream.
- // Create a view to ensure that the 'system.views' collection exists.
- assert.commandWorked(
- testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
- aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- assertDropCollection(testDB, "system.views");
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
-
- cst.cleanUp();
-}());
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/collation.js
index 4375da16843..00432b91057 100644
--- a/jstests/change_streams/change_stream_collation.js
+++ b/jstests/change_streams/collation.js
@@ -312,7 +312,8 @@
// $changeStream.
assert.soon(() => changeStream.hasNext());
assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"});
- assert(changeStream.isExhausted());
+ // Only single-collection streams will be exhausted from the drop.
+ assert(changeStream.isExhausted() || isChangeStreamPassthrough());
// Test that a pipeline with an explicit collation is allowed to resume from before the
// collection is dropped and recreated.
@@ -322,7 +323,8 @@
assert.soon(() => changeStream.hasNext());
assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
- assert(changeStream.isExhausted());
+ // Only single-collection streams will be exhausted from the drop.
+ assert(changeStream.isExhausted() || isChangeStreamPassthrough());
// Test that a pipeline without an explicit collation is not allowed to resume,
// even though the collection has been recreated with the same default collation as it
diff --git a/jstests/change_streams/change_stream_does_not_implicitly_create_database.js b/jstests/change_streams/does_not_implicitly_create_database.js
index 052a53585bd..052a53585bd 100644
--- a/jstests/change_streams/change_stream_does_not_implicitly_create_database.js
+++ b/jstests/change_streams/does_not_implicitly_create_database.js
diff --git a/jstests/change_streams/include_cluster_time.js b/jstests/change_streams/include_cluster_time.js
index 4be37f1f338..405bf2d5b16 100644
--- a/jstests/change_streams/include_cluster_time.js
+++ b/jstests/change_streams/include_cluster_time.js
@@ -2,6 +2,7 @@
(function() {
"use strict";
+ load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp.
load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
// Drop and recreate the collections to be used in this set of tests.
@@ -47,8 +48,10 @@
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
- assert.eq(next.operationType, "invalidate");
+ assert.eq(next.operationType, "drop");
assert.lte(next.clusterTime, dropClusterTime);
+ assertInvalidateOp({cursor: changeStream, opType: "drop"});
+
changeStream.close();
}());
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index ec7f47d57f6..7d1a0a1a457 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -236,7 +236,11 @@
});
latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
- latestChange = cst.getOneChange(cursor, true);
- assert.eq(latestChange.operationType, "invalidate");
- assert(!latestChange.hasOwnProperty("fullDocument"));
+ latestChange = cst.getOneChange(cursor);
+ assert.eq(latestChange.operationType, "drop");
+ // Only single-collection change streams will be invalidated by the drop.
+ if (!isChangeStreamPassthrough()) {
+ latestChange = cst.getOneChange(cursor, true);
+ assert.eq(latestChange.operationType, "invalidate");
+ }
}());
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
new file mode 100644
index 00000000000..ea7559de77b
--- /dev/null
+++ b/jstests/change_streams/metadata_notifications.js
@@ -0,0 +1,201 @@
+// Tests of $changeStream notifications for metadata operations.
+(function() {
+ "use strict";
+
+ load("jstests/libs/change_stream_util.js");
+ load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ db = db.getSiblingDB(jsTestName());
+ let cst = new ChangeStreamTest(db);
+
+ db.getMongo().forceReadMode('commands');
+
+ // Test that it is possible to open a new change stream cursor on a collection that does not
+ // exist.
+ const collName = "test";
+ assertDropCollection(db, collName);
+
+ // Cursor creation succeeds, but there are no results. We do not expect to see a notification
+ // for collection creation.
+ let cursor = cst.startWatchingChanges(
+ {collection: collName, pipeline: [{$changeStream: {}}, {$project: {operationType: 1}}]});
+
+ // We explicitly test getMore, to ensure that the getMore command for a non-existent collection
+ // does not return an error.
+ let change = cst.getNextBatch(cursor);
+ assert.neq(change.id, 0);
+ assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
+
+ // After collection creation, we expect to see oplog entries for each subsequent operation.
+ let coll = assertCreateCollection(db, collName);
+ assert.writeOK(coll.insert({_id: 0}));
+
+ change = cst.getOneChange(cursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+
+ // Create oplog entries of type insert, update, delete, and drop.
+ assert.writeOK(coll.insert({_id: 1}));
+ assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
+ assert.writeOK(coll.remove({_id: 1}));
+ assertDropCollection(db, coll.getName());
+
+ // Get a valid resume token that the next aggregate command can use.
+ change = cst.getOneChange(cursor);
+ assert.eq(change.operationType, "insert");
+ const resumeToken = change._id;
+
+ // We should get 4 oplog entries of type update, delete, drop, and invalidate. The cursor should
+ // be closed.
+ let expectedChanges = [
+ {operationType: "update"},
+ {operationType: "delete"},
+ {operationType: "drop"},
+ {operationType: "invalidate"},
+ ];
+ const changes = cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
+
+ // It should not be possible to resume a change stream after a collection drop without an
+ // explicit collation, even if the invalidate has not been received.
+ assert.commandFailedWithCode(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {}
+ }),
+ ErrorCodes.InvalidResumeToken);
+
+ // Recreate the collection.
+ coll = assertCreateCollection(db, collName);
+ assert.writeOK(coll.insert({_id: 0}));
+
+ // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to
+ // be sure that it doesn't crash the server, but the ability to resume a change stream after an
+ // invalidate is a bug, not a feature.
+
+ // Test resuming the change stream from the collection drop.
+ assert.doesNotThrow(function() {
+ const resumeTokenDrop = changes[2]._id;
+ const resumeCursor =
+ coll.watch([], {resumeAfter: resumeTokenDrop, collation: {locale: "simple"}});
+ assert.soon(() => resumeCursor.hasNext());
+ // Not checking the contents of the document returned, because we do not technically
+ // support this behavior.
+ resumeCursor.next();
+ });
+ // Test resuming the change stream from the invalidate after the drop.
+ assert.doesNotThrow(function() {
+ const resumeTokenInvalidate = changes[3]._id;
+ const resumeCursor =
+ coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}});
+ assert.soon(() => resumeCursor.hasNext());
+ // Not checking the contents of the document returned, because we do not technically
+ // support this behavior.
+ resumeCursor.next();
+ });
+
+ // Test that renaming a collection being watched generates a "rename" entry followed by an
+ // "invalidate". This is true if the change stream is on the source or target collection of the
+ // rename. Sharded collections cannot be renamed.
+ if (!FixtureHelpers.isSharded(coll)) {
+ cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
+ assertDropCollection(db, "renamed_coll");
+ assert.writeOK(coll.renameCollection("renamed_coll"));
+ let expected = [
+ {
+ operationType: "rename",
+ ns: {db: db.getName(), coll: collName},
+ to: {db: db.getName(), coll: "renamed_coll"},
+ },
+ {operationType: "invalidate"}
+ ];
+ cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: expected, expectInvalidate: true});
+
+ coll = db["renamed_coll"];
+
+ // Repeat the test, this time with a change stream open on the target.
+ cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
+ assert.writeOK(coll.renameCollection(collName));
+ expected = [
+ {
+ operationType: "rename",
+ ns: {db: db.getName(), coll: "renamed_coll"},
+ to: {db: db.getName(), coll: collName},
+ },
+ {operationType: "invalidate"}
+ ];
+ const changes = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
+ coll = db[collName];
+
+ // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here
+ // to be sure that it doesn't crash the server, but the ability to resume a change stream
+ // after an invalidate is a bug, not a feature.
+
+ // Test resuming the change stream from the collection rename.
+ assert.doesNotThrow(function() {
+ const resumeTokenRename = changes[0]._id;
+ const resumeCursor =
+ coll.watch([], {resumeAfter: resumeTokenRename, collation: {locale: "simple"}});
+ });
+ // Test resuming the change stream from the invalidate after the drop.
+ assert.doesNotThrow(function() {
+ const resumeTokenInvalidate = changes[1]._id;
+ const resumeCursor =
+ coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}});
+ });
+
+ assertDropAndRecreateCollection(db, "renamed_coll");
+ assert.writeOK(db.renamed_coll.insert({_id: 0}));
+
+ // Repeat the test again, this time using the 'dropTarget' option with an existing target
+ // collection.
+ cursor =
+ cst.startWatchingChanges({collection: "renamed_coll", pipeline: [{$changeStream: {}}]});
+ assert.writeOK(coll.renameCollection("renamed_coll", true /* dropTarget */));
+ expected = [
+ {
+ operationType: "rename",
+ ns: {db: db.getName(), coll: collName},
+ to: {db: db.getName(), coll: "renamed_coll"},
+ },
+ {operationType: "invalidate"}
+ ];
+ cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: expected, expectInvalidate: true});
+
+ coll = db["renamed_coll"];
+
+ // Test the behavior of a change stream watching the target collection of a $out aggregation
+ // stage.
+ cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]});
+ coll.aggregate([{$out: collName}]);
+ // Note that $out will first create a temp collection, and then rename the temp collection
+ // to the target. Do not explicitly check the 'ns' field.
+ const rename = cst.getOneChange(cursor);
+ assert.eq(rename.operationType, "rename", tojson(rename));
+ assert.eq(rename.to, {db: db.getName(), coll: collName}, tojson(rename));
+ assert.eq(cst.getOneChange(cursor, true).operationType, "invalidate");
+ }
+
+ // Test that dropping a database will first drop all of it's collections, invalidating any
+ // change streams on those collections.
+ cursor = cst.startWatchingChanges({
+ collection: coll.getName(),
+ pipeline: [{$changeStream: {}}],
+ });
+ assert.commandWorked(db.dropDatabase());
+
+ expectedChanges = [
+ {
+ operationType: "drop",
+ ns: {db: db.getName(), coll: coll.getName()},
+ },
+ {operationType: "invalidate"}
+ ];
+ cst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true});
+
+ cst.cleanUp();
+}());
diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/shell_helper.js
index 6ecb95221eb..e833c053013 100644
--- a/jstests/change_streams/change_stream_shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -6,6 +6,7 @@
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+ load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp.
const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper");
@@ -162,14 +163,12 @@
assert(!changeStreamCursor.isClosed());
assert(!changeStreamCursor.isExhausted());
- // Dropping the collection should invalidate any open change streams.
+ // Dropping the collection should trigger a drop notification.
assertDropCollection(db, coll.getName());
assert.soon(() => changeStreamCursor.hasNext());
- assert(changeStreamCursor.isClosed());
assert(!changeStreamCursor.isExhausted());
- expected = {operationType: "invalidate"};
+ expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}};
checkNextChange(changeStreamCursor, expected);
- assert(!changeStreamCursor.hasNext());
- assert(changeStreamCursor.isClosed());
- assert(changeStreamCursor.isExhausted());
+ // For single collection change streams, the drop should invalidate the stream.
+ assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"});
}());
diff --git a/jstests/change_streams/change_stream_start_at_cluster_time.js b/jstests/change_streams/start_at_cluster_time.js
index 3fb6786437a..3fb6786437a 100644
--- a/jstests/change_streams/change_stream_start_at_cluster_time.js
+++ b/jstests/change_streams/start_at_cluster_time.js
diff --git a/jstests/change_streams/change_stream_whitelist.js b/jstests/change_streams/whitelist.js
index 12848e1e9e7..12848e1e9e7 100644
--- a/jstests/change_streams/change_stream_whitelist.js
+++ b/jstests/change_streams/whitelist.js
diff --git a/jstests/change_streams/change_stream_whole_cluster.js b/jstests/change_streams/whole_cluster.js
index 0f3c9d6ec92..485721dce44 100644
--- a/jstests/change_streams/change_stream_whole_cluster.js
+++ b/jstests/change_streams/whole_cluster.js
@@ -6,8 +6,9 @@
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and
// assert[Valid|Invalid]ChangeStreamNss.
+ db = db.getSiblingDB(jsTestName());
const adminDB = db.getSiblingDB("admin");
- const otherDB = db.getSiblingDB(`${db.getName()}_other`);
+ const otherDB = db.getSiblingDB(jsTestName() + "_other");
// Drop and recreate the collections to be used in this set of tests.
assertDropAndRecreateCollection(db, "t1");
@@ -49,10 +50,17 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
- // Dropping either database should invalidate the change stream.
+ // Dropping a database should generate drop entries for each collection followed by an
+ // invalidate.
+ // TODO SERVER-35029: This test should not invalidate the stream once there's support for
+ // returning a notification for the dropDatabase command.
assert.commandWorked(otherDB.dropDatabase());
- expected = {operationType: "invalidate"};
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+ expected = [
+ {operationType: "drop", ns: {db: otherDB.getName(), coll: "t2"}},
+ {operationType: "invalidate"}
+ ];
+
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
// Drop the remaining database and clean up the test.
assert.commandWorked(db.dropDatabase());
diff --git a/jstests/change_streams/whole_cluster_metadata_notifications.js b/jstests/change_streams/whole_cluster_metadata_notifications.js
new file mode 100644
index 00000000000..9004e36135b
--- /dev/null
+++ b/jstests/change_streams/whole_cluster_metadata_notifications.js
@@ -0,0 +1,225 @@
+// Tests of metadata notifications for a $changeStream on a whole cluster.
+(function() {
+ "use strict";
+
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
+ // Define two databases. We will conduct our tests by creating one collection in each.
+ const testDB1 = db.getSiblingDB(jsTestName()),
+ testDB2 = db.getSiblingDB(jsTestName() + "_other");
+ const adminDB = db.getSiblingDB("admin");
+
+ assert.commandWorked(testDB1.dropDatabase());
+ assert.commandWorked(testDB2.dropDatabase());
+
+ // Create one collection on each database.
+ let [db1Coll, db2Coll] =
+ [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test"));
+
+ // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened
+ // on admin.
+ let cst = new ChangeStreamTest(adminDB);
+ let aggCursor = cst.startWatchingAllChangesForCluster();
+
+ // Generate oplog entries of type insert, update, and delete across both databases.
+ for (let coll of[db1Coll, db2Coll]) {
+ assert.writeOK(coll.insert({_id: 1}));
+ assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
+ assert.writeOK(coll.remove({_id: 1}));
+ }
+
+ // Drop the second database, which should invalidate the stream.
+ assert.commandWorked(testDB2.dropDatabase());
+
+ // We should get 6 oplog entries; three ops of type insert, update, delete from each database.
+ for (let expectedDB of[testDB1, testDB2]) {
+ let change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "update", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "delete", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ }
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {operationType: "drop", ns: {db: testDB2.getName(), coll: db2Coll.getName()}},
+ // TODO SERVER-35029: Return an entry for a database drop, instead of "invalidate".
+ {operationType: "invalidate"}
+ ],
+ expectInvalidate: true
+ });
+
+ // Test that a cluster-wide change stream can be resumed using a token from a collection which
+ // has been dropped.
+ db1Coll = assertDropAndRecreateCollection(testDB1, db1Coll.getName());
+
+ // Get a valid resume token that the next change stream can use.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+
+ assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ let change = cst.getOneChange(aggCursor, false);
+ const resumeToken = change._id;
+
+ // For cluster-wide streams, it is possible to resume at a point before a collection is dropped,
+ // even if the "drop" notification has not been received on the original stream yet.
+ assertDropCollection(db1Coll, db1Coll.getName());
+ // Wait for two-phase drop to complete, so that the UUID no longer exists.
+ assert.soon(function() {
+ return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1,
+ db1Coll.getName());
+ });
+ assert.commandWorked(adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}],
+ cursor: {}
+ }));
+
+ // Test that collection drops from any database result in "drop" notifications for the stream.
+ [db1Coll, db2Coll] =
+ [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test"));
+ let _idForTest = 0;
+ for (let collToInvalidate of[db1Coll, db2Coll]) {
+ // Start watching all changes in the cluster.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+
+ let testDB = collToInvalidate.getDB();
+
+ // Insert into the collections on both databases, and verify the change stream is able to
+ // pick them up.
+ for (let collToWrite of[db1Coll, db2Coll]) {
+ assert.writeOK(collToWrite.insert({_id: _idForTest}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, _idForTest);
+ assert.eq(change.ns.db, collToWrite.getDB().getName());
+ _idForTest++;
+ }
+
+ // Renaming the collection should generate a 'rename' notification. Skip this test when
+ // running on a sharded collection, since these cannot be renamed.
+ if (!FixtureHelpers.isSharded(collToInvalidate)) {
+ assertDropAndRecreateCollection(testDB, collToInvalidate.getName());
+ const collName = collToInvalidate.getName();
+
+ // Start watching all changes in the cluster.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assert.writeOK(collToInvalidate.renameCollection("renamed_coll"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
+ to: {db: testDB.getName(), coll: "renamed_coll"}
+ },
+ ]
+ });
+
+ // Repeat the test, this time using the 'dropTarget' option with an existing target
+ // collection.
+ collToInvalidate = testDB.getCollection("renamed_coll");
+ assertDropAndRecreateCollection(testDB, collName);
+ assert.writeOK(testDB[collName].insert({_id: 0}));
+ assert.writeOK(collToInvalidate.renameCollection(collName, true /* dropTarget */));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {
+ operationType: "insert",
+ ns: {db: testDB.getName(), coll: collName},
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0}
+ },
+ {
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: "renamed_coll"},
+ to: {db: testDB.getName(), coll: collName}
+ }
+ ]
+ });
+
+ collToInvalidate = testDB[collName];
+
+ // Test renaming a collection to a different database. Do not run this in the mongos
+ // passthrough suites since we cannot guarantee the primary shard of the target database
+ // and renameCollection requires the source and destination to be on the same shard.
+ if (!FixtureHelpers.isMongos(testDB)) {
+ const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target");
+ // Ensure the target database exists.
+ const collOtherDB = assertDropAndRecreateCollection(otherDB, "test");
+ assertDropCollection(otherDB, collOtherDB.getName());
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assert.commandWorked(testDB.adminCommand({
+ renameCollection: collToInvalidate.getFullName(),
+ to: collOtherDB.getFullName()
+ }));
+ // Do not check the 'ns' field since it will contain the namespace of the temp
+ // collection created when renaming a collection across databases.
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "rename", tojson(change));
+ assert.eq(change.to,
+ {db: otherDB.getName(), coll: collOtherDB.getName()},
+ tojson(change));
+ // Rename across databases also drops the source collection after the collection is
+ // copied over.
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{
+ operationType: "drop",
+ ns: {db: testDB.getName(), coll: collToInvalidate.getName()}
+ }]
+ });
+ }
+
+ // Test the behavior of a change stream watching the target collection of a $out
+ // aggregation stage.
+ collToInvalidate.aggregate([{$out: "renamed_coll"}]);
+ // Do not check the 'ns' field since it will contain the namespace of the temp
+ // collection created by the $out stage, before renaming to 'renamed_coll'.
+ const rename = cst.getOneChange(aggCursor);
+ assert.eq(rename.operationType, "rename", tojson(rename));
+ assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename));
+
+ // The change stream should not be invalidated by the rename(s).
+ assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
+ assert.writeOK(collToInvalidate.insert({_id: 2}));
+ assert.eq(cst.getOneChange(aggCursor).operationType, "insert");
+ }
+
+ // Dropping a collection should generate a 'drop' entry.
+ assertDropCollection(testDB, collToInvalidate.getName());
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{
+ operationType: "drop",
+ ns: {db: testDB.getName(), coll: collToInvalidate.getName()}
+ }]
+ });
+
+ // Dropping a 'system' collection should also generate a 'drop' notification.
+ // Create a view to ensure that the 'system.views' collection exists.
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
+ // TODO SERVER-35401: This insert is inconsistent with the behavior for whole-db change
+ // streams.
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns, {db: testDB.getName(), coll: "system.views"});
+ assertDropCollection(testDB, "system.views");
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges:
+ [{operationType: "drop", ns: {db: testDB.getName(), coll: "system.views"}}]
+ });
+ }
+
+ cst.cleanUp();
+}());
diff --git a/jstests/change_streams/change_stream_whole_cluster_resumability.js b/jstests/change_streams/whole_cluster_resumability.js
index df1041d0628..9058c58010a 100644
--- a/jstests/change_streams/change_stream_whole_cluster_resumability.js
+++ b/jstests/change_streams/whole_cluster_resumability.js
@@ -6,9 +6,8 @@
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
// Create two databases, with one collection in each.
- const testDBs = [db, db.getSiblingDB(`${db.getName()}_other`)];
- const[db1Coll, db2Coll] =
- testDBs.map((db) => assertDropAndRecreateCollection(db, jsTestName()));
+ const testDBs = [db, db.getSiblingDB(jsTestName() + "_other")];
+ const[db1Coll, db2Coll] = testDBs.map((db) => assertDropAndRecreateCollection(db, "test"));
const adminDB = db.getSiblingDB("admin");
let cst = new ChangeStreamTest(adminDB);
diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/whole_db.js
index c904c4a750e..1f57797b784 100644
--- a/jstests/change_streams/change_stream_whole_db.js
+++ b/jstests/change_streams/whole_db.js
@@ -7,9 +7,7 @@
// assert[Valid|Invalid]ChangeStreamNss.
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
- // Drop and recreate the collections to be used in this set of tests.
- assertDropAndRecreateCollection(db, "t1");
- assertDropAndRecreateCollection(db, "t2");
+ db = db.getSiblingDB(jsTestName());
// Test that a single-database change stream cannot be opened on "admin", "config", or "local".
assertInvalidChangeStreamNss("admin", 1);
@@ -29,7 +27,7 @@
let expected = {
documentKey: {_id: 0},
fullDocument: {_id: 0, a: 1},
- ns: {db: "test", coll: "t1"},
+ ns: {db: db.getName(), coll: "t1"},
operationType: "insert",
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
@@ -40,14 +38,24 @@
expected = {
documentKey: {_id: 0},
fullDocument: {_id: 0, a: 2},
- ns: {db: "test", coll: "t2"},
+ ns: {db: db.getName(), coll: "t2"},
operationType: "insert",
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
- // Dropping the database should invalidate the change stream.
+ // Dropping the database should generate collection drop entries followed by an invalidate. Note
+ // that the order of collection drops is not guaranteed so only check the database name.
assert.commandWorked(db.dropDatabase());
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]});
+ let change = cst.getOneChange(cursor);
+ assert.eq(change.operationType, "drop", tojson(change));
+ assert.eq(change.ns.db, db.getName(), tojson(change));
+ change = cst.getOneChange(cursor);
+ assert.eq(change.operationType, "drop", tojson(change));
+ assert.eq(change.ns.db, db.getName(), tojson(change));
+
+ // TODO SERVER-35029: Expect to see a 'dropDatabase' entry before the invalidate.
+ change = cst.getOneChange(cursor, true);
+ assert.eq(change.operationType, "invalidate", tojson(change));
cst.cleanUp();
}());
diff --git a/jstests/change_streams/whole_db_metadata_notifications.js b/jstests/change_streams/whole_db_metadata_notifications.js
new file mode 100644
index 00000000000..ade3bbe6f34
--- /dev/null
+++ b/jstests/change_streams/whole_db_metadata_notifications.js
@@ -0,0 +1,183 @@
+// Tests of metadata notifications for a $changeStream on a whole database.
+(function() {
+ "use strict";
+
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
+ const testDB = db.getSiblingDB(jsTestName());
+ let cst = new ChangeStreamTest(testDB);
+
+ // Write a document to the collection and test that the change stream returns it
+ // and getMore command closes the cursor afterwards.
+ const collName = "test";
+ let coll = assertDropAndRecreateCollection(testDB, collName);
+
+ let aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+
+ // Create oplog entries of type insert, update, and delete.
+ assert.writeOK(coll.insert({_id: 1}));
+ assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
+ assert.writeOK(coll.remove({_id: 1}));
+
+ // Drop and recreate the collection.
+ const collAgg = assertDropAndRecreateCollection(testDB, collName);
+
+ // We should get 4 oplog entries of type insert, update, delete, and drop.
+ let change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "update", tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "delete", tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "drop", tojson(change));
+
+ // Get a valid resume token that the next change stream can use.
+ assert.writeOK(collAgg.insert({_id: 1}));
+
+ change = cst.getOneChange(aggCursor, false);
+ const resumeToken = change._id;
+
+ // For whole-db streams, it is possible to resume at a point before a collection is dropped.
+ assertDropCollection(testDB, collAgg.getName());
+ // Wait for two-phase drop to complete, so that the UUID no longer exists.
+ assert.soon(function() {
+ return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB,
+ collAgg.getName());
+ });
+ assert.commandWorked(testDB.runCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}}));
+
+ // Test that invalidation entries for other databases are filtered out.
+ const otherDB = testDB.getSiblingDB(jsTestName() + "other");
+ const otherDBColl = otherDB[collName + "_other"];
+ assert.writeOK(otherDBColl.insert({_id: 0}));
+
+ // Create collection on the database being watched.
+ coll = assertDropAndRecreateCollection(testDB, collName);
+
+ // Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be
+ // upconverted to a cluster-wide stream, which would return an entry for the dropped collection
+ // in the other database.
+ aggCursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true});
+
+ // Drop the collection on the other database, this should *not* invalidate the change stream.
+ assertDropCollection(otherDB, otherDBColl.getName());
+
+ // Insert into the collection in the watched database, and verify the change stream is able to
+ // pick it up.
+ assert.writeOK(coll.insert({_id: 1}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 1);
+
+ // Test that renaming a collection generates a 'rename' entry for the 'from' collection. MongoDB
+ // does not allow renaming of sharded collections, so only perform this test if the collection
+ // is not sharded.
+ if (!FixtureHelpers.isSharded(coll)) {
+ assertDropAndRecreateCollection(testDB, coll.getName());
+ assertDropCollection(testDB, "renamed_coll");
+ aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+ assert.writeOK(coll.renameCollection("renamed_coll"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: coll.getName()},
+ to: {db: testDB.getName(), coll: "renamed_coll"}
+ }]
+ });
+
+ // Repeat the test, this time using the 'dropTarget' option with an existing target
+ // collection.
+ coll = testDB["renamed_coll"];
+ assertCreateCollection(testDB, collName);
+ assert.writeOK(testDB[collName].insert({_id: 0}));
+ assert.writeOK(coll.renameCollection(collName, true /* dropTarget */));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {
+ operationType: "insert",
+ ns: {db: testDB.getName(), coll: collName},
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0}
+ },
+ {
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: "renamed_coll"},
+ to: {db: testDB.getName(), coll: collName}
+ }
+ ]
+ });
+
+ coll = testDB[collName];
+ // Test renaming a collection from the database being watched to a different database. Do
+ // not run this in the mongos passthrough suites since we cannot guarantee the primary shard
+ // of the target database, and renameCollection requires the source and destination to be on
+ // the same shard.
+ if (!FixtureHelpers.isMongos(testDB)) {
+ const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target");
+ // Create target collection to ensure the database exists.
+ const collOtherDB = assertCreateCollection(otherDB, "test");
+ assertDropCollection(otherDB, "test");
+ assert.commandWorked(testDB.adminCommand(
+ {renameCollection: coll.getFullName(), to: collOtherDB.getFullName()}));
+ // Rename across databases drops the source collection after the collection is copied
+ // over.
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges:
+ [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}]
+ });
+
+ // Test renaming a collection from a different database to the database being watched.
+ assert.commandWorked(testDB.adminCommand(
+ {renameCollection: collOtherDB.getFullName(), to: coll.getFullName()}));
+ // Do not check the 'ns' field since it will contain the namespace of the temp
+ // collection created when renaming a collection across databases.
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "rename");
+ assert.eq(change.to, {db: testDB.getName(), coll: coll.getName()});
+ }
+
+ // Test the behavior of a change stream watching the target collection of a $out aggregation
+ // stage.
+ coll.aggregate([{$out: "renamed_coll"}]);
+ // Note that $out will first create a temp collection, and then rename the temp collection
+ // to the target. Do not explicitly check the 'ns' field.
+ const rename = cst.getOneChange(aggCursor);
+ assert.eq(rename.operationType, "rename", tojson(rename));
+ assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename));
+
+ // The change stream should not be invalidated by the rename(s).
+ assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
+ assert.writeOK(coll.insert({_id: 2}));
+ assert.eq(cst.getOneChange(aggCursor).operationType, "insert");
+ }
+
+ // Dropping a collection should return a 'drop' entry.
+ assertDropCollection(testDB, coll.getName());
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges:
+ [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}],
+ });
+
+ // Dropping a 'system' collection should also generate a 'drop' notification.
+ // Create a view to ensure that the 'system.views' collection exists.
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
+ assertDropCollection(testDB, "system.views");
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges:
+ [{operationType: "drop", ns: {db: testDB.getName(), coll: "system.views"}}]
+ });
+
+ cst.cleanUp();
+}());
diff --git a/jstests/change_streams/change_stream_whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js
index 682d0546cb7..682d0546cb7 100644
--- a/jstests/change_streams/change_stream_whole_db_resumability.js
+++ b/jstests/change_streams/whole_db_resumability.js
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index 4221adbaa30..48f1e133b53 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -7,10 +7,36 @@
// the same as calling db.runCommand. If a passthrough is active and has defined a function
// 'changeStreamPassthroughAwareRunCommand', then this method will be overridden to allow individual
// streams to explicitly exempt themselves from being modified by the passthrough.
+function isChangeStreamPassthrough() {
+ return typeof changeStreamPassthroughAwareRunCommand != 'undefined';
+}
const runCommandChangeStreamPassthroughAware =
- (typeof changeStreamPassthroughAwareRunCommand === 'undefined'
- ? ((db, cmdObj) => db.runCommand(cmdObj))
- : changeStreamPassthroughAwareRunCommand);
+ (!isChangeStreamPassthrough() ? ((db, cmdObj) => db.runCommand(cmdObj))
+ : changeStreamPassthroughAwareRunCommand);
+
+/**
+ * Asserts that the given opType triggers an invalidate entry depending on the type of change
+ * stream.
+ * - single collection streams: drop, rename, and dropDatabase.
+ * - whole DB streams: dropDatabase.
+ * - whole cluster streams: none.
+ */
+function assertInvalidateOp({cursor, opType}) {
+ if (!isChangeStreamPassthrough()) {
+ // All metadata operations will invalidate a single-collection change stream.
+ assert.soon(() => cursor.hasNext());
+ assert.eq(cursor.next().operationType, "invalidate");
+ assert(cursor.isExhausted());
+ assert(cursor.isClosed());
+ } else {
+ // Collection drops do not validate whole-db/cluster change streams.
+ if (opType == "drop") {
+ return;
+ }
+
+ // TODO SERVER-35029: Database drops should only invalidate whole-db change streams.
+ }
+}
function ChangeStreamTest(_db, name = "ChangeStreamTest") {
load("jstests/libs/namespace_utils.js"); // For getCollectionNameFromFullNamespace.
diff --git a/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js
index 3ee93a6fb24..f6a1da78b4e 100644
--- a/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js
+++ b/jstests/libs/override_methods/implicit_whole_cluster_changestreams.js
@@ -27,6 +27,12 @@ ChangeStreamPassthroughHelpers.nsMatchFilter = function(db, collName) {
"ns.db": db.getName(),
"ns.coll": (isSingleCollectionStream ? collName : {$exists: true})
},
+ // Add a clause to detect if the collection being watched is the target of a
+ // renameCollection command, since that is expected to return a "rename" entry.
+ {
+ "to.db": db.getName(),
+ "to.coll": (isSingleCollectionStream ? collName : {$exists: true})
+ },
{operationType: "invalidate"}
]
}
diff --git a/jstests/libs/override_methods/implicit_whole_db_changestreams.js b/jstests/libs/override_methods/implicit_whole_db_changestreams.js
index f88af373263..056a2659a0c 100644
--- a/jstests/libs/override_methods/implicit_whole_db_changestreams.js
+++ b/jstests/libs/override_methods/implicit_whole_db_changestreams.js
@@ -65,8 +65,11 @@ const ChangeStreamPassthroughHelpers = {
nsMatchFilter: function(db, collName) {
return {
$match: {
- $or:
- [{"ns.db": db.getName(), "ns.coll": collName}, {operationType: "invalidate"}]
+ $or: [
+ {"ns.db": db.getName(), "ns.coll": collName},
+ {"to.db": db.getName(), "to.coll": collName},
+ {operationType: "invalidate"}
+ ]
}
};
},
diff --git a/jstests/sharding/change_stream_invalidation.js b/jstests/sharding/change_stream_metadata_notifications.js
index a50d8eb88ea..ec8865698d5 100644
--- a/jstests/sharding/change_stream_invalidation.js
+++ b/jstests/sharding/change_stream_metadata_notifications.js
@@ -1,9 +1,11 @@
-// Tests invalidation of change streams on sharded collections.
+// Tests metadata notifications of change streams on sharded collections.
+// Legacy getMore fails after dropping the database that the original cursor is on.
+// @tags: [requires_find_command]
(function() {
"use strict";
+ load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
- load('jstests/libs/write_concern_util.js'); // For stopReplicationOnSecondaries.
// For supportsMajorityReadConcern.
load('jstests/multiVersion/libs/causal_consistency_helpers.js');
@@ -53,10 +55,11 @@
assert.writeOK(mongosColl.update({shardKey: 1, _id: 1}, {$set: {updated: true}}));
assert.writeOK(mongosColl.insert({shardKey: 2, _id: 2}));
- // Drop the collection and test that we return "invalidate" entry and close the cursor.
+ // Drop the collection and test that we return a "drop" entry, followed by an "invalidate"
+ // entry.
mongosColl.drop();
- // Test that we see the two writes that happened before the invalidation.
+ // Test that we see the two writes that happened before the collection drop.
assert.soon(() => changeStream.hasNext());
let next = changeStream.next();
assert.eq(next.operationType, "update");
@@ -75,6 +78,11 @@
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
+ assert.eq(next.operationType, "drop");
+ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
assert.eq(next.operationType, "invalidate");
assert(changeStream.isExhausted());
@@ -95,6 +103,11 @@
assert.soon(() => changeStream.hasNext());
next = changeStream.next();
+ assert.eq(next.operationType, "drop");
+ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
assert.eq(next.operationType, "invalidate");
assert(changeStream.isExhausted());
@@ -122,5 +135,24 @@
}),
ErrorCodes.InvalidResumeToken);
+ // Recreate the collection as unsharded and open a change stream on it.
+ assertDropAndRecreateCollection(mongosDB, mongosColl.getName());
+
+ changeStream = mongosColl.watch();
+
+ // Drop the database and verify that the stream returns a collection drop followed by an
+ // invalidate.
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "drop");
+ assert.eq(next.ns, {db: mongosDB.getName(), coll: mongosColl.getName()});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "invalidate");
+ assert(changeStream.isExhausted());
+
st.stop();
})();
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index cdec0971204..1000d047cd0 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -120,13 +120,16 @@
changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}]);
assert(!changeStream.hasNext());
- // Drop the collection and test that we return "invalidate" entry and close the cursor.
+ // Drop the collection and test that we return a "drop" followed by an "invalidate" entry and
+ // close the cursor.
jsTestLog("Testing getMore command closes cursor for invalidate entries");
mongosColl.drop();
// Wait for the drop to actually happen.
assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
mongosColl.getDB(), mongosColl.getName()));
assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+ assert.soon(() => changeStream.hasNext());
assert.eq(changeStream.next().operationType, "invalidate");
assert(changeStream.isExhausted());
@@ -164,8 +167,10 @@
assert.eq(next.documentKey._id, 2);
assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+
+ assert.soon(() => changeStream.hasNext());
assert.eq(changeStream.next().operationType, "invalidate");
- assert(changeStream.isExhausted());
// With an explicit collation, test that we can resume from before the collection drop.
changeStream = mongosColl.watch([{$project: {_id: 0}}],
@@ -177,9 +182,10 @@
assert.eq(next.documentKey, {_id: 2});
assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "invalidate");
- assert(changeStream.isExhausted());
+ assert.eq(changeStream.next().operationType, "drop");
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
// Without an explicit collation, test that we *cannot* resume from before the collection drop.
assert.commandFailedWithCode(mongosDB.runCommand({
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 4ad710b50c4..2e3a152f872 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -78,10 +78,14 @@ constexpr StringData DocumentSourceChangeStream::kStageName;
constexpr StringData DocumentSourceChangeStream::kClusterTimeField;
constexpr StringData DocumentSourceChangeStream::kTxnNumberField;
constexpr StringData DocumentSourceChangeStream::kLsidField;
+constexpr StringData DocumentSourceChangeStream::kRenameTargetNssField;
constexpr StringData DocumentSourceChangeStream::kUpdateOpType;
constexpr StringData DocumentSourceChangeStream::kDeleteOpType;
constexpr StringData DocumentSourceChangeStream::kReplaceOpType;
constexpr StringData DocumentSourceChangeStream::kInsertOpType;
+constexpr StringData DocumentSourceChangeStream::kDropCollectionOpType;
+constexpr StringData DocumentSourceChangeStream::kRenameCollectionOpType;
+constexpr StringData DocumentSourceChangeStream::kDropDatabaseOpType;
constexpr StringData DocumentSourceChangeStream::kInvalidateOpType;
constexpr StringData DocumentSourceChangeStream::kNewShardDetectedOpType;
@@ -202,6 +206,7 @@ private:
: DocumentSource(expCtx) {}
bool _shouldCloseCursor = false;
+ boost::optional<Document> _queuedInvalidate;
};
DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
@@ -212,6 +217,11 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
uasserted(ErrorCodes::CloseChangeStream, "Change stream has been invalidated");
}
+ if (_queuedInvalidate) {
+ _shouldCloseCursor = true;
+ return DocumentSource::GetNextResult(std::move(_queuedInvalidate.get()));
+ }
+
auto nextInput = pSource->getNext();
if (!nextInput.isAdvanced())
return nextInput;
@@ -228,6 +238,22 @@ DocumentSource::GetNextResult DocumentSourceCloseCursor::getNext() {
_shouldCloseCursor = true;
}
+ // Check if this is an invalidating command and the next entry should be an "invalidate".
+ // TODO SERVER-35029: For whole-db change streams, only a database drop will invalidate the
+ // stream.
+ const auto invalidatingCommand = pExpCtx->isSingleNamespaceAggregation()
+ ? (operationType == DocumentSourceChangeStream::kDropCollectionOpType ||
+ operationType == DocumentSourceChangeStream::kRenameCollectionOpType)
+ : false;
+
+ if (invalidatingCommand) {
+ _queuedInvalidate = Document{
+ {DocumentSourceChangeStream::kIdField, doc[DocumentSourceChangeStream::kIdField]},
+ {DocumentSourceChangeStream::kClusterTimeField,
+ doc[DocumentSourceChangeStream::kClusterTimeField]},
+ {DocumentSourceChangeStream::kOperationTypeField, "invalidate"_sd}};
+ }
+
return nextInput;
}
@@ -304,27 +330,28 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
ChangeStreamType sourceType = getChangeStreamType(nss);
// 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field.
- BSONArrayBuilder invalidatingCommands;
- invalidatingCommands.append(BSON("o.dropDatabase" << 1));
+ BSONArrayBuilder relevantCommands;
if (sourceType == ChangeStreamType::kSingleCollection) {
- invalidatingCommands.append(BSON("o.drop" << nss.coll()));
- invalidatingCommands.append(BSON("o.renameCollection" << nss.ns()));
+ relevantCommands.append(BSON("o.drop" << nss.coll()));
+ // Generate 'rename' entries if the change stream is open on the source or target namespace.
+ relevantCommands.append(BSON("o.renameCollection" << nss.ns()));
+ relevantCommands.append(BSON("o.to" << nss.ns()));
if (expCtx->collation.isEmpty()) {
// If the user did not specify a collation, they should be using the collection's
// default collation. So a "create" command which has any collation present would
// invalidate the change stream, since that must mean the stream was created before the
// collection existed and used the simple collation, which is no longer the default.
- invalidatingCommands.append(
+ relevantCommands.append(
BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true)));
}
} else {
- // For change streams on an entire database, the stream is invalidated if any collections in
- // that database are dropped or renamed. For cluster-wide streams, drops or renames of any
- // collection in any database (aside from the internal databases admin, config and local)
- // will invalidate the stream.
- invalidatingCommands.append(BSON("o.drop" << BSON("$exists" << true)));
- invalidatingCommands.append(BSON("o.renameCollection" << BSON("$exists" << true)));
+ // For change streams on an entire database, include notifications for individual collection
+ // drops and renames which will not invalidate the stream. Also include the 'dropDatabase'
+ // command which will invalidate the stream.
+ relevantCommands.append(BSON("o.drop" << BSON("$exists" << true)));
+ relevantCommands.append(BSON("o.dropDatabase" << BSON("$exists" << true)));
+ relevantCommands.append(BSON("o.renameCollection" << BSON("$exists" << true)));
}
// For cluster-wide $changeStream, match the command namespace of any database other than admin,
@@ -333,9 +360,9 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
? BSON("ns" << BSONRegEx("^" + kRegexAllDBs + kRegexCmdColl))
: BSON("ns" << nss.getCommandNS().ns()));
- // 1.1) Commands that are on target db(s) and one of the above invalidating commands.
+ // 1.1) Commands that are on target db(s) and one of the above supported commands.
auto commandsOnTargetDb =
- BSON("$and" << BSON_ARRAY(cmdNsFilter << BSON("$or" << invalidatingCommands.arr())));
+ BSON("$and" << BSON_ARRAY(cmdNsFilter << BSON("$or" << relevantCommands.arr())));
// 1.2) Supported commands that have arbitrary db namespaces in "ns" field.
auto renameDropTarget = (sourceType == ChangeStreamType::kAllChangesForCluster
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 0912e79ebd7..cf1720b7d36 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -135,11 +135,17 @@ public:
static constexpr StringData kTxnNumberField = "txnNumber"_sd;
static constexpr StringData kLsidField = "lsid"_sd;
+ // The target namespace of a rename operation.
+ static constexpr StringData kRenameTargetNssField = "to"_sd;
+
// The different types of operations we can use for the operation type.
static constexpr StringData kUpdateOpType = "update"_sd;
static constexpr StringData kDeleteOpType = "delete"_sd;
static constexpr StringData kReplaceOpType = "replace"_sd;
static constexpr StringData kInsertOpType = "insert"_sd;
+ static constexpr StringData kDropCollectionOpType = "drop"_sd;
+ static constexpr StringData kRenameCollectionOpType = "rename"_sd;
+ static constexpr StringData kDropDatabaseOpType = "dropDatabase"_sd;
static constexpr StringData kInvalidateOpType = "invalidate"_sd;
// Internal op type to signal mongos to open cursors on new shards.
static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd;
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index af12c25c7d8..f465a5994ef 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -111,25 +111,27 @@ public:
void checkTransformation(const OplogEntry& entry,
const boost::optional<Document> expectedDoc,
- std::vector<FieldPath> docKeyFields,
- const BSONObj& spec) {
+ std::vector<FieldPath> docKeyFields = {},
+ const BSONObj& spec = kDefaultSpec,
+ const boost::optional<Document> expectedInvalidate = {}) {
vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry.toBSON(), spec);
- auto transform = stages[2].get();
+ auto closeCursor = stages.back();
getExpCtx()->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(docKeyFields);
- auto next = transform->getNext();
+ auto next = closeCursor->getNext();
// Match stage should pass the doc down if expectedDoc is given.
ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc));
if (expectedDoc) {
ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedDoc);
}
- }
- void checkTransformation(const OplogEntry& entry,
- const boost::optional<Document> expectedDoc,
- std::vector<FieldPath> docKeyFields = {}) {
- return checkTransformation(entry, expectedDoc, docKeyFields, kDefaultSpec);
+ if (expectedInvalidate) {
+ next = closeCursor->getNext();
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), *expectedInvalidate);
+ // Then throw an exception on the next call of getNext().
+ ASSERT_THROWS(closeCursor->getNext(), ExceptionFor<ErrorCodes::CloseChangeStream>);
+ }
}
/**
@@ -623,32 +625,44 @@ TEST_F(ChangeStreamStageTest, TransformDeleteFromMigrate) {
checkTransformation(deleteEntry, boost::none);
}
-TEST_F(ChangeStreamStageTest, TransformInvalidate) {
- NamespaceString otherColl("test.bar");
-
+TEST_F(ChangeStreamStageTest, TransformDrop) {
OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid());
- bool dropDBFromMigrate = false; // verify this doesn't get it filtered
- OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, dropDBFromMigrate);
- OplogEntry rename =
- createCommand(BSON("renameCollection" << nss.ns() << "to" << otherColl.ns()), testUuid());
- // Invalidate entry doesn't have a document id.
+ Document expectedDrop{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ };
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
};
- for (auto& entry : {dropColl, rename}) {
- checkTransformation(entry, expectedInvalidate);
- }
- // Drop database invalidate entry doesn't have a UUID.
- Document expectedInvalidateDropDatabase{
- {DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ checkTransformation(dropColl, expectedDrop, {}, kDefaultSpec, expectedInvalidate);
+}
+
+TEST_F(ChangeStreamStageTest, TransformRename) {
+ NamespaceString otherColl("test.bar");
+ OplogEntry rename =
+ createCommand(BSON("renameCollection" << nss.ns() << "to" << otherColl.ns()), testUuid());
+
+ Document expectedRename{
+ {DSChangeStream::kRenameTargetNssField,
+ D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
- checkTransformation(dropDB, expectedInvalidateDropDatabase);
+ Document expectedInvalidate{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ };
+
+ checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate);
}
TEST_F(ChangeStreamStageTest, TransformInvalidateFromMigrate) {
@@ -670,22 +684,25 @@ TEST_F(ChangeStreamStageTest, TransformInvalidateFromMigrate) {
}
}
-TEST_F(ChangeStreamStageTest, TransformInvalidateRenameDropTarget) {
+TEST_F(ChangeStreamStageTest, TransformRenameTarget) {
NamespaceString otherColl("test.bar");
- auto rename =
- makeOplogEntry(OpTypeEnum::kCommand, // op type
- otherColl.getCommandNS(), // namespace
- BSON("renameCollection" << otherColl.ns() << "to" << nss.ns()), // o
- testUuid(), // uuid
- boost::none, // fromMigrate
- boost::none); // o2
+ OplogEntry rename =
+ createCommand(BSON("renameCollection" << otherColl.ns() << "to" << nss.ns()), testUuid());
+ Document expectedRename{
+ {DSChangeStream::kRenameTargetNssField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
+ };
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
};
- checkTransformation(rename, expectedInvalidate);
+
+ checkTransformation(rename, expectedRename, {}, kDefaultSpec, expectedInvalidate);
}
TEST_F(ChangeStreamStageTest, TransformNewShardDetected) {
@@ -882,17 +899,35 @@ TEST_F(ChangeStreamStageTest, ClusterTimeMatchesOplogEntry) {
};
checkTransformation(updateField, expectedUpdateField);
- // Test the 'clusterTime' field is copied from the oplog entry for an invalidation.
+ // Test the 'clusterTime' field is copied from the oplog entry for a collection drop.
OplogEntry dropColl =
createCommand(BSON("drop" << nss.coll()), testUuid(), boost::none, opTime);
- // Invalidate entry doesn't have a document id.
- Document expectedInvalidate{
+ Document expectedDrop{
{DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ };
+ checkTransformation(dropColl, expectedDrop);
+
+ // Test the 'clusterTime' field is copied from the oplog entry for a collection rename.
+ NamespaceString otherColl("test.bar");
+ OplogEntry rename =
+ createCommand(BSON("renameCollection" << nss.ns() << "to" << otherColl.ns()),
+ testUuid(),
+ boost::none,
+ opTime);
+
+ Document expectedRename{
+ {DSChangeStream::kRenameTargetNssField,
+ D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
+ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
+ {DSChangeStream::kClusterTimeField, ts},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
- checkTransformation(dropColl, expectedInvalidate);
+ checkTransformation(rename, expectedRename);
}
TEST_F(ChangeStreamStageTest, MatchFiltersCreateCollection) {
@@ -979,13 +1014,22 @@ TEST_F(ChangeStreamStageTest, CloseCursorOnInvalidateEntries) {
auto stages = makeStages(dropColl);
auto closeCursor = stages.back();
+ Document expectedDrop{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ };
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
};
auto next = closeCursor->getNext();
+ // Transform into drop entry.
+ ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedDrop);
+ next = closeCursor->getNext();
// Transform into invalidate entry.
ASSERT_DOCUMENT_EQ(next.releaseDocument(), expectedInvalidate);
// Then throw an exception on the next call of getNext().
@@ -1292,25 +1336,38 @@ TEST_F(ChangeStreamStageDBTest, TransformDeleteFromMigrate) {
checkTransformation(deleteEntry, boost::none);
}
-TEST_F(ChangeStreamStageDBTest, TransformInvalidate) {
- NamespaceString otherColl("test.bar");
-
+TEST_F(ChangeStreamStageDBTest, TransformDrop) {
OplogEntry dropColl = createCommand(BSON("drop" << nss.coll()), testUuid());
- bool dropDBFromMigrate = false; // verify this doesn't get it filtered
- OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, dropDBFromMigrate);
+ Document expectedDrop{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ };
+ checkTransformation(dropColl, expectedDrop);
+}
+
+TEST_F(ChangeStreamStageDBTest, TransformRename) {
+ NamespaceString otherColl("test.bar");
OplogEntry rename =
createCommand(BSON("renameCollection" << nss.ns() << "to" << otherColl.ns()), testUuid());
- // Invalidate entry doesn't have a document id.
- Document expectedInvalidate{
+ Document expectedRename{
+ {DSChangeStream::kRenameTargetNssField,
+ D{{"db", otherColl.db()}, {"coll", otherColl.coll()}}},
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
};
- for (auto& entry : {dropColl, rename}) {
- checkTransformation(entry, expectedInvalidate);
- }
+ checkTransformation(rename, expectedRename);
+}
+
+TEST_F(ChangeStreamStageDBTest, TransformDropDatabase) {
+ bool dropDBFromMigrate = false; // verify this doesn't get it filtered
+ OplogEntry dropDB = createCommand(BSON("dropDatabase" << 1), boost::none, dropDBFromMigrate);
+ // TODO SERVER-35029: Add notification for DB drop followed by invalidate.
// Drop database invalidate entry doesn't have a UUID.
Document expectedInvalidateDropDatabase{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs)},
@@ -1320,21 +1377,35 @@ TEST_F(ChangeStreamStageDBTest, TransformInvalidate) {
checkTransformation(dropDB, expectedInvalidateDropDatabase);
}
-TEST_F(ChangeStreamStageDBTest, SystemCollectionsDropOrRenameShouldInvalidate) {
+TEST_F(ChangeStreamStageDBTest, SystemCollectionsDrop) {
NamespaceString systemColl(nss.db() + ".system.users");
- NamespaceString renamedSystemColl(nss.db() + ".system.users_new");
- Document expectedInvalidate{
+ OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid());
+ // Note that the collection drop does *not* have the queued invalidated field.
+ Document expectedDrop{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
- {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kDropCollectionOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}},
};
+ checkTransformation(dropColl, expectedDrop);
+}
- OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid());
- checkTransformation(dropColl, expectedInvalidate);
+TEST_F(ChangeStreamStageDBTest, SystemCollectionsRename) {
+ NamespaceString systemColl(nss.db() + ".system.users");
+ NamespaceString renamedSystemColl(nss.db() + ".system.users_new");
OplogEntry rename = createCommand(
BSON("renameCollection" << systemColl.ns() << "to" << renamedSystemColl.ns()), testUuid());
- checkTransformation(rename, expectedInvalidate);
+ // Note that the collection rename does *not* have the queued invalidated field.
+ Document expectedRename{
+ {DSChangeStream::kRenameTargetNssField,
+ D{{"db", renamedSystemColl.db()}, {"coll", renamedSystemColl.coll()}}},
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kRenameCollectionOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ {DSChangeStream::kNamespaceField, D{{"db", systemColl.db()}, {"coll", systemColl.coll()}}},
+ };
+ checkTransformation(rename, expectedRename);
}
TEST_F(ChangeStreamStageDBTest, MatchFiltersNoOp) {
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 9e7e623eeab..8f25e7e6151 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -292,10 +292,28 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
invariant(nextDoc);
return applyTransformation(*nextDoc);
+ } else if (!input.getNestedField("o.drop").missing()) {
+ operationType = DocumentSourceChangeStream::kDropCollectionOpType;
+
+ // The "o.drop" field will contain the actual collection name.
+ nss = NamespaceString(nss.db(), input.getNestedField("o.drop").getString());
+ } else if (!input.getNestedField("o.renameCollection").missing()) {
+ operationType = DocumentSourceChangeStream::kRenameCollectionOpType;
+
+ // The "o.renameCollection" field contains the namespace of the original collection.
+ nss = NamespaceString(input.getNestedField("o.renameCollection").getString());
+
+ // The "o.to" field contains the target namespace for the rename.
+ const auto renameTargetNss =
+ NamespaceString(input.getNestedField("o.to").getString());
+ doc.addField(DocumentSourceChangeStream::kRenameTargetNssField,
+ Value(Document{{"db", renameTargetNss.db()},
+ {"coll", renameTargetNss.coll()}}));
+ } else {
+ // All other commands will invalidate the stream.
+ operationType = DocumentSourceChangeStream::kInvalidateOpType;
}
- // Any command that makes it through our filter is an invalidating command such as a
- // drop.
- operationType = DocumentSourceChangeStream::kInvalidateOpType;
+
// Make sure the result doesn't have a document key.
documentKey = Value();
break;