diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-11-12 19:13:34 +0000 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-11-20 12:37:38 +0000 |
commit | a5e3007d307867d874a9377bcd172b88df8163f7 (patch) | |
tree | 7f93282313ae4081e2854d0c2c79a050a15fdfbb | |
parent | b73d85120498f49e44d2ad3329d33b4de595715c (diff) | |
download | mongo-a5e3007d307867d874a9377bcd172b88df8163f7.tar.gz |
SERVER-31394 Create passthrough of existing $changeStream tests to run against sharded collections
21 files changed, 253 insertions, 148 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams.yml b/buildscripts/resmokeconfig/suites/change_streams.yml index 9d616e0d4b8..89d5a1c231e 100644 --- a/buildscripts/resmokeconfig/suites/change_streams.yml +++ b/buildscripts/resmokeconfig/suites/change_streams.yml @@ -26,7 +26,6 @@ executor: class: ReplicaSetFixture mongod_options: bind_ip_all: '' - enableMajorityReadConcern: '' oplogSize: 511 set_parameters: enableTestCommands: 1 diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index 16070851739..ca94f28c281 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -29,7 +29,6 @@ executor: mongod_options: bind_ip_all: '' nopreallocj: '' - enableMajorityReadConcern: '' set_parameters: enableTestCommands: 1 numInitialSyncAttempts: 1 diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml new file mode 100644 index 00000000000..9d0b301e992 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -0,0 +1,42 @@ +test_kind: js_test + +selector: + roots: + - jstests/change_streams/**/*.js + exclude_files: + # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos. + - jstests/change_streams/report_latest_observed_oplog_timestamp.js +executor: + config: + shell_options: + global_vars: + TestData: + defaultReadConcernLevel: majority + enableMajorityReadConcern: '' + eval: >- + var testingReplication = true; + load('jstests/libs/override_methods/set_read_and_write_concerns.js'); + load('jstests/libs/override_methods/implicitly_shard_accessed_collections.js'); + readMode: commands + hooks: + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ShardedClusterFixture + mongos_options: + bind_ip_all: '' + set_parameters: + enableTestCommands: 1 + mongod_options: + bind_ip_all: '' + nopreallocj: '' + set_parameters: + enableTestCommands: 1 + writePeriodicNoops: 1 + periodicNoopIntervalSecs: 1 + numInitialSyncAttempts: 1 + num_rs_nodes_per_shard: 1 + num_shards: 2 + enable_sharding: + - test diff --git a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml index 24fc0eb3176..fbd53ee4dc2 100644 --- a/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharded_causally_consistent_jscore_passthrough.yml @@ -17,6 +17,7 @@ selector: - jstests/core/compact_keeps_indexes.js # compact. - jstests/core/currentop.js # uses fsync. - jstests/core/auth_copydb.js # copyDatabase. + - jstests/core/copydatabase_no_id_index.js # copyDatabase. - jstests/core/copydb.js # copyDatabase. - jstests/core/dbadmin.js # "local" database. - jstests/core/dbhash.js # dbhash. diff --git a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml index 03b04f5b050..c9b23f10d29 100644 --- a/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/sharded_collections_jscore_passthrough.yml @@ -15,6 +15,7 @@ selector: - jstests/core/compact_keeps_indexes.js # compact. - jstests/core/currentop.js # uses fsync. - jstests/core/auth_copydb.js # copyDatabase. + - jstests/core/copydatabase_no_id_index.js # copyDatabase. - jstests/core/copydb.js # copyDatabase. - jstests/core/dbadmin.js # "local" database. - jstests/core/dbhash.js # dbhash. diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 2370d6a155d..234d656e541 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -2892,6 +2892,17 @@ tasks: run_multiple_jobs: true - <<: *task_template + name: change_streams_sharded_collections_passthrough_WT + depends_on: + - name: change_streams_WT + commands: + - func: "do setup" + - func: "run tests" + vars: + resmoke_args: --suites=change_streams_sharded_collections_passthrough --storageEngine=wiredTiger + run_multiple_jobs: true + +- <<: *task_template name: dbtest commands: - func: "do setup" @@ -4918,6 +4929,7 @@ buildvariants: - name: change_streams_WT - name: change_streams_mongos_passthrough_WT - name: change_streams_secondary_reads_WT + - name: change_streams_sharded_collections_passthrough_WT - name: dbtest_WT - name: disk_WT - name: failpoints @@ -5037,6 +5049,7 @@ buildvariants: - name: causally_consistent_jscore_passthrough_auth_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT - name: concurrency_replication_WT - name: concurrency_sharded_WT @@ -6004,6 +6017,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_sharded_collections_passthrough_WT - name: dbtest_WT - name: disk_WT - name: failpoints @@ -6513,6 +6527,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT - name: concurrency_replication_WT - name: concurrency_sharded_WT @@ -6755,6 +6770,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT - name: concurrency_replication_WT - name: concurrency_sharded_WT @@ -6932,6 +6948,7 @@ buildvariants: - name: change_streams_WT - name: change_streams_mongos_passthrough_WT - name: change_streams_secondary_reads_WT + - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT - name: concurrency_replication_WT - name: concurrency_sharded_WT @@ -8650,6 +8667,7 @@ buildvariants: - name: sharded_causally_consistent_jscore_passthrough_WT - name: change_streams_WT - name: change_streams_mongos_passthrough_WT + - name: change_streams_sharded_collections_passthrough_WT - name: concurrency distros: - rhel62-large # Some workloads require a lot of memory, use a bigger machine for this suite. @@ -9042,6 +9060,7 @@ buildvariants: - name: change_streams_WT - name: change_streams_mongos_passthrough_WT - name: change_streams_secondary_reads_WT + - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT - name: concurrency_replication_WT - name: concurrency_sharded_WT @@ -9190,6 +9209,7 @@ buildvariants: - name: change_streams_WT - name: change_streams_mongos_passthrough_WT - name: change_streams_secondary_reads_WT + - name: change_streams_sharded_collections_passthrough_WT - name: concurrency_WT - name: concurrency_replication_WT - name: concurrency_sharded_WT diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index c1415a2a1f5..80356aa3b48 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -2,13 +2,14 @@ (function() { "use strict"; + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/change_stream_util.js"); load('jstests/libs/uuid_util.js'); let cst = new ChangeStreamTest(db); jsTestLog("Testing single insert"); - db.t1.drop(); + assertDropCollection(db, "t1"); let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); // Test that if there are no changes, we return an empty batch. assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); @@ -40,7 +41,7 @@ jsTestLog("Testing update"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); - assert.writeOK(db.t1.update({_id: 0}, {a: 3})); + assert.writeOK(db.t1.update({_id: 0}, {_id: 0, a: 3})); expected = { documentKey: {_id: 0}, fullDocument: {_id: 0, a: 3}, @@ -51,7 +52,7 @@ jsTestLog("Testing update of another field"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); - assert.writeOK(db.t1.update({_id: 0}, {b: 3})); + assert.writeOK(db.t1.update({_id: 0}, {_id: 0, b: 3})); expected = { documentKey: {_id: 0}, fullDocument: {_id: 0, b: 3}, @@ -62,7 +63,7 @@ jsTestLog("Testing upsert"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); - assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true})); + assert.writeOK(db.t1.update({_id: 2}, {_id: 2, a: 4}, {upsert: true})); expected = { documentKey: {_id: 2}, fullDocument: {_id: 2, a: 4}, @@ -94,6 +95,7 @@ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); jsTestLog("Testing intervening write on another collection"); + assertDropCollection(db, "t2"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); assert.writeOK(db.t2.insert({_id: 100, c: 1})); @@ -109,25 +111,28 @@ jsTestLog("Testing drop of unrelated collection"); assert.writeOK(db.dropping.insert({})); - db.dropping.drop(); + assertDropCollection(db, db.dropping.getName()); // Should still see the previous change from t2, shouldn't see anything about 'dropping'. - jsTestLog("Testing rename"); - db.t3.drop(); - t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); - assert.writeOK(db.t2.renameCollection("t3")); - expected = {operationType: "invalidate"}; - cst.assertNextChangesEqual( - {cursor: t2cursor, expectedChanges: [expected], expectInvalidate: true}); + // Test collection renaming. Sharded collections cannot be renamed. + if (!db.t2.stats().sharded) { + jsTestLog("Testing rename"); + assertDropCollection(db, "t3"); + t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); + assert.writeOK(db.t2.renameCollection("t3")); + expected = {operationType: "invalidate"}; + cst.assertNextChangesEqual( + {cursor: t2cursor, expectedChanges: [expected], expectInvalidate: true}); + } jsTestLog("Testing insert that looks like rename"); - db.dne1.drop(); - db.dne2.drop(); + assertDropCollection(db, "dne1"); + assertDropCollection(db, "dne2"); const dne1cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne1}); const dne2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2}); - assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); + assert.writeOK(db.t2.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); cst.assertNextChangesEqual({cursor: dne1cursor, expectedChanges: []}); cst.assertNextChangesEqual({cursor: dne2cursor, expectedChanges: []}); @@ -144,8 +149,7 @@ } jsTestLog("Testing resumability"); - db.resume1.drop(); - assert.commandWorked(db.createCollection("resume1")); + assertDropAndRecreateCollection(db, "resume1"); // Note we do not project away 'id.ts' as it is part of the resume token. let resumeCursor = cst.startWatchingChanges( diff --git a/jstests/change_streams/change_stream_ban_from_lookup.js b/jstests/change_streams/change_stream_ban_from_lookup.js index f17be0ecef1..08bc600ad55 100644 --- a/jstests/change_streams/change_stream_ban_from_lookup.js +++ b/jstests/change_streams/change_stream_ban_from_lookup.js @@ -4,16 +4,17 @@ (function() { "use strict"; - load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - const coll = db.change_stream_ban_from_lookup; - coll.drop(); + const coll = assertDropAndRecreateCollection(db, "change_stream_ban_from_lookup"); + const foreignColl = "unsharded"; assert.writeOK(coll.insert({_id: 1})); // Verify that we cannot create a $lookup using a pipeline which begins with $changeStream. assertErrorCode(coll, - [{$lookup: {from: coll.getName(), as: 'as', pipeline: [{$changeStream: {}}]}}], + [{$lookup: {from: foreignColl, as: 'as', pipeline: [{$changeStream: {}}]}}], ErrorCodes.IllegalOperation); // Verify that we cannot create a $lookup if its pipeline contains a sub-$lookup whose pipeline @@ -22,14 +23,11 @@ coll, [{ $lookup: { - from: coll.getName(), + from: foreignColl, as: 'as', pipeline: [ {$match: {_id: 1}}, - { - $lookup: - {from: coll.getName(), as: 'subas', pipeline: [{$changeStream: {}}]} - } + {$lookup: {from: foreignColl, as: 'subas', pipeline: [{$changeStream: {}}]}} ] } }], diff --git a/jstests/change_streams/change_stream_ban_from_views.js b/jstests/change_streams/change_stream_ban_from_views.js index b2789e72155..643df54bd74 100644 --- a/jstests/change_streams/change_stream_ban_from_views.js +++ b/jstests/change_streams/change_stream_ban_from_views.js @@ -4,29 +4,30 @@ (function() { "use strict"; - const collName = "change_stream_ban_from_views"; + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + const coll = assertDropAndRecreateCollection(db, "change_stream_ban_from_views"); + assert.writeOK(coll.insert({_id: 1})); + const normalViewName = "nonChangeStreamView"; const csViewName = "changeStreamView"; - db[normalViewName].drop(); - db[csViewName].drop(); - db[collName].drop(); - - assert.writeOK(db[collName].insert({_id: 1})); + assertDropCollection(db, normalViewName); + assertDropCollection(db, csViewName); const csPipe = [{$changeStream: {}}]; // Create one valid view for testing purposes. - assert.commandWorked( - db.runCommand({create: normalViewName, viewOn: collName, pipeline: [{$match: {_id: 1}}]})); + assert.commandWorked(db.runCommand( + {create: normalViewName, viewOn: coll.getName(), pipeline: [{$match: {_id: 1}}]})); // Verify that we cannot create a view using a pipeline which begins with $changeStream. assert.commandFailedWithCode( - db.runCommand({create: csViewName, viewOn: collName, pipeline: csPipe}), + db.runCommand({create: csViewName, viewOn: coll.getName(), pipeline: csPipe}), ErrorCodes.OptionNotSupportedOnView); // We also cannot update an existing view to use a $changeStream pipeline. assert.commandFailedWithCode( - db.runCommand({collMod: normalViewName, viewOn: collName, pipeline: csPipe}), + db.runCommand({collMod: normalViewName, viewOn: coll.getName(), pipeline: csPipe}), ErrorCodes.OptionNotSupportedOnView); })(); diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js index 5093bbe4ed0..314e7b08954 100644 --- a/jstests/change_streams/change_stream_collation.js +++ b/jstests/change_streams/change_stream_collation.js @@ -1,20 +1,18 @@ /** * Tests that a change stream can use a user-specified, or collection-default collation. - * - * This test assumes that it will be able to drop and then re-create a collection with non-default - * options. - * @tags: [assumes_no_implicit_collection_creation_after_drop] */ (function() { "use strict"; - load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'. let cst = new ChangeStreamTest(db); const caseInsensitive = {locale: "en_US", strength: 2}; - const caseInsensitiveCollection = db.change_stream_case_insensitive; - caseInsensitiveCollection.drop(); + + let caseInsensitiveCollection = "change_stream_case_insensitive"; + assertDropCollection(db, caseInsensitiveCollection); // Test that you can open a change stream before the collection exists, and it will use the // simple collation. @@ -23,8 +21,8 @@ // Create the collection with a non-default collation - this should invalidate the stream we // opened before it existed. - assert.commandWorked( - db.runCommand({create: caseInsensitiveCollection.getName(), collation: caseInsensitive})); + caseInsensitiveCollection = + assertCreateCollection(db, caseInsensitiveCollection, {collation: caseInsensitive}); cst.assertNextChangesEqual({ cursor: simpleCollationStream, expectedChanges: [{operationType: "invalidate"}], @@ -60,10 +58,8 @@ {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]}); // Test that the collation does not apply to the scan over the oplog. - const similarNameCollection = db.cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe; - similarNameCollection.drop(); - assert.commandWorked( - db.runCommand({create: similarNameCollection.getName(), collation: {locale: "en_US"}})); + const similarNameCollection = assertDropAndRecreateCollection( + db, "cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe", {collation: {locale: "en_US"}}); assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"})); @@ -79,15 +75,15 @@ // Test that creating a collection without a collation does not invalidate any change streams // that were opened before the collection existed. (function() { - const noCollationCollection = db.change_stream_no_collation; - noCollationCollection.drop(); + let noCollationCollection = "change_stream_no_collation"; + assertDropCollection(db, noCollationCollection); const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], collection: noCollationCollection }); - assert.commandWorked(db.runCommand({create: noCollationCollection.getName()})); + noCollationCollection = assertCreateCollection(db, noCollationCollection); assert.writeOK(noCollationCollection.insert({_id: 0})); cst.assertNextChangesEqual( @@ -97,16 +93,16 @@ // Test that creating a collection and explicitly specifying the simple collation does not // invalidate any change streams that were opened before the collection existed. (function() { - const simpleCollationCollection = db.change_stream_simple_collation; - simpleCollationCollection.drop(); + let simpleCollationCollection = "change_stream_simple_collation"; + assertDropCollection(db, simpleCollationCollection); const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], collection: simpleCollationCollection }); - assert.commandWorked(db.runCommand( - {create: simpleCollationCollection.getName(), collation: {locale: "simple"}})); + simpleCollationCollection = + assertCreateCollection(db, simpleCollationCollection, {collation: {locale: "simple"}}); assert.writeOK(simpleCollationCollection.insert({_id: 0})); cst.assertNextChangesEqual( @@ -116,8 +112,8 @@ // Test that creating a change stream with a non-default collation, then creating a collection // with the same collation will not invalidate the change stream. (function() { - const frenchCollection = db.change_stream_french_collation; - frenchCollection.drop(); + let frenchCollection = "change_stream_french_collation"; + assertDropCollection(db, frenchCollection); const frenchChangeStream = cst.startWatchingChanges({ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], @@ -125,8 +121,8 @@ collection: frenchCollection }); - assert.commandWorked( - db.runCommand({create: frenchCollection.getName(), collation: {locale: "fr"}})); + frenchCollection = + assertCreateCollection(db, frenchCollection, {collation: {locale: "fr"}}); assert.writeOK(frenchCollection.insert({_id: 0})); cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]}); @@ -135,8 +131,8 @@ // Test that creating a change stream with a non-default collation, then creating a collection // with *a different* collation will not invalidate the change stream. (function() { - const germanCollection = db.change_stream_german_collation; - germanCollection.drop(); + let germanCollection = "change_stream_german_collation"; + assertDropCollection(db, germanCollection); const englishCaseInsensitiveStream = cst.startWatchingChanges({ pipeline: [ @@ -148,8 +144,8 @@ collection: germanCollection }); - assert.commandWorked( - db.runCommand({create: germanCollection.getName(), collation: {locale: "de"}})); + germanCollection = + assertCreateCollection(db, germanCollection, {collation: {locale: "de"}}); assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"})); cst.assertNextChangesEqual( @@ -159,10 +155,8 @@ // Test that creating a change stream with a non-default collation against a collection that has // a non-simple default collation will use the collation specified on the operation. (function() { - const caseInsensitiveCollection = db.change_stream_case_insensitive; - caseInsensitiveCollection.drop(); - assert.commandWorked(db.runCommand( - {create: caseInsensitiveCollection.getName(), collation: caseInsensitive})); + const caseInsensitiveCollection = assertDropAndRecreateCollection( + db, "change_stream_case_insensitive", {collation: caseInsensitive}); const englishCaseSensitiveStream = cst.startWatchingChanges({ pipeline: [ @@ -185,9 +179,8 @@ // Test that creating a change stream with a non-default collation against a collection that has // a simple default collation will use the collation specified on the operation. (function() { - const noCollationCollection = db.change_stream_no_collation; - noCollationCollection.drop(); - assert.commandWorked(db.runCommand({create: noCollationCollection.getName()})); + const noCollationCollection = + assertDropAndRecreateCollection(db, "change_stream_no_collation"); const cursor = noCollationCollection.watch( [ @@ -198,7 +191,7 @@ assert(!cursor.hasNext()); assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"})); assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"})); - assert(cursor.hasNext()); + assert.soon(() => cursor.hasNext()); assert.docEq(cursor.next(), {docId: 0}); assert(cursor.hasNext()); assert.docEq(cursor.next(), {docId: 1}); diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js index c442b1dcecf..7f3a1c0529b 100644 --- a/jstests/change_streams/change_stream_invalidation.js +++ b/jstests/change_streams/change_stream_invalidation.js @@ -6,6 +6,7 @@ load("jstests/libs/change_stream_util.js"); load('jstests/libs/uuid_util.js'); load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. let cst = new ChangeStreamTest(db); @@ -13,7 +14,7 @@ // Write a document to the collection and test that the change stream returns it // and getMore command closes the cursor afterwards. - const collGetMore = db.change_stream_getmore_invalidations; + const collGetMore = assertDropAndRecreateCollection(db, "change_stream_getmore_invalidations"); // We awaited the replication of the first write, so the change stream shouldn't return it. // Use { w: "majority" } to deal with journaling correctly, even though we only have one node. assert.writeOK(collGetMore.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}})); @@ -47,8 +48,7 @@ }); jsTestLog("Testing aggregate command closes cursor for invalidate entries"); - const collAgg = db.change_stream_agg_invalidations; - db.createCollection(collAgg.getName()); + const collAgg = assertDropAndRecreateCollection(db, "change_stream_agg_invalidations"); const collAggUuid = getUUIDFromListCollections(db, collAgg.getName()); // Get a valid resume token that the next aggregate command can use. aggcursor = cst.startWatchingChanges( @@ -61,7 +61,7 @@ // It should not possible to resume a change stream after a collection drop, even if the // invalidate has not been received. - assert(collAgg.drop()); + assertDropCollection(db, collAgg.getName()); // Wait for two-phase drop to complete, so that the UUID no longer exists. assert.soon(function() { return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collAgg.getName()); @@ -75,13 +75,14 @@ // Test that it is possible to open a new change stream cursor on a collection that does not // exist. jsTestLog("Testing aggregate command on nonexistent collection"); - const collDoesNotExist = db.change_stream_agg_invalidations_does_not_exist; - db.runCommand({drop: collDoesNotExist.getName(), writeConcern: {j: true}}); + const collDoesNotExistName = "change_stream_agg_invalidations_does_not_exist"; + assertDropCollection(db, collDoesNotExistName); // Cursor creation succeeds, but there are no results. aggcursor = cst.startWatchingChanges({ - collection: collDoesNotExist, + collection: collDoesNotExistName, pipeline: [{$changeStream: {}}], + includeToken: true, }); // We explicitly test getMore, to ensure that the getMore command for a non-existent collection @@ -91,7 +92,8 @@ assert.eq(aggcursor.nextBatch.length, 0, tojson(aggcursor.nextBatch)); // After collection creation, we see oplog entries for the collection. - assert.writeOK(collDoesNotExist.insert({_id: 0}, {writeConcern: {j: true}})); + const collNowExists = assertCreateCollection(db, collDoesNotExistName); + assert.writeOK(collNowExists.insert({_id: 0}, {writeConcern: {j: true}})); change = cst.getOneChange(aggcursor); assert.eq(change.operationType, "insert", tojson(change)); diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js index 4e972e0fd6f..58d20645ae1 100644 --- a/jstests/change_streams/change_stream_shell_helper.js +++ b/jstests/change_streams/change_stream_shell_helper.js @@ -3,12 +3,12 @@ (function() { "use strict"; - const collName = "change_stream_shell_helper"; - const coll = db[collName]; - coll.drop(); + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper"); function checkNextChange(cursor, expected) { - assert(cursor.hasNext()); + assert.soon(() => cursor.hasNext()); assert.docEq(cursor.next(), expected); } @@ -41,13 +41,13 @@ let cursor = coll.watch(); assert(!cursor.hasNext()); assert.writeOK(coll.insert({_id: 0, x: 1})); - assert(cursor.hasNext()); + assert.soon(() => cursor.hasNext()); let change = cursor.next(); assert(!cursor.hasNext()); let expected = { documentKey: {_id: 0}, fullDocument: {_id: 0, x: 1}, - ns: {db: "test", coll: collName}, + ns: {db: "test", coll: coll.getName()}, operationType: "insert", }; assert("_id" in change, "Got unexpected change: " + tojson(change)); @@ -72,7 +72,7 @@ expected = { documentKey: {_id: 0}, fullDocument: {_id: 0, x: 10}, - ns: {db: "test", coll: collName}, + ns: {db: "test", coll: coll.getName()}, operationType: "update", updateDescription: {removedFields: [], updatedFields: {x: 10}}, }; @@ -135,15 +135,15 @@ expected = { documentKey: {_id: 2}, fullDocument: {_id: 2, x: 1}, - ns: {db: "test", coll: collName}, + ns: {db: "test", coll: coll.getName()}, operationType: "insert", }; checkNextChange(cursor, expected); assert(!cursor.hasNext()); assert(!cursor.isClosed()); assert(!cursor.isExhausted()); - coll.drop(); - assert(cursor.hasNext()); + assertDropCollection(db, coll.getName()); + assert.soon(() => cursor.hasNext()); assert(cursor.isClosed()); assert(!cursor.isExhausted()); expected = {operationType: "invalidate"}; diff --git a/jstests/change_streams/change_stream_whitelist.js b/jstests/change_streams/change_stream_whitelist.js index 817dd35fef4..12848e1e9e7 100644 --- a/jstests/change_streams/change_stream_whitelist.js +++ b/jstests/change_streams/change_stream_whitelist.js @@ -5,11 +5,13 @@ (function() { "use strict"; - load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). + load('jstests/aggregation/extras/utils.js'); // For assertErrorCode. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + const coll = assertDropAndRecreateCollection(db, "change_stream_whitelist"); // Bare-bones $changeStream pipeline which will be augmented during tests. const changeStream = [{$changeStream: {}}]; - const coll = db[jsTestName()]; // List of non-$changeStream stages which are explicitly whitelisted. const whitelist = [ diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index 0f75ad073e9..bba5021e393 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -6,12 +6,11 @@ "use strict"; load("jstests/libs/change_stream_util.js"); + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'. let cst = new ChangeStreamTest(db); - const coll = db.change_post_image; - - coll.drop(); + const coll = assertDropAndRecreateCollection(db, "change_post_image"); jsTestLog("Testing change streams without 'fullDocument' specified"); // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for an @@ -124,7 +123,7 @@ collection: coll, pipeline: [ {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, - {$match: {operationType: "update"}} + {$match: {operationType: {$ne: "delete"}}} ], aggregateOptions: {cursor: {batchSize: 0}} }); @@ -134,19 +133,35 @@ collection: coll, pipeline: [ {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, - {$match: {operationType: "update"}} + {$match: {operationType: {$ne: "delete"}}} ], aggregateOptions: {cursor: {batchSize: 0}} }); + // Retrieve the 'insert' operation from the latter stream. This is necessary on a sharded + // collection so that the documentKey is retrieved before the collection is recreated; + // otherwise, per SERVER-31691, a uassert will occur. + // TODO SERVER-31847: all remaining operations on the old UUID should be visible even if we have + // not retrieved the first oplog entry before the collection is recreated. + latestChange = cst.getOneChange(cursorBeforeDrop); + assert.eq(latestChange.operationType, "insert"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); + // Drop the collection and wait until two-phase drop finishes. - coll.drop(); + assertDropCollection(db, coll.getName()); assert.soon(function() { return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName()); }); - // Check the next $changeStream entry; this is the test document inserted above. The collection - // has been dropped, so our attempt to look up the post-image results in a null document. + // Check the next $changeStream entry; this is the test document inserted above. + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "insert"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); + + // The next entry is the 'update' operation. Because the collection has been dropped, our + // attempt to look up the post-image results in a null document. latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "update"); assert(latestChange.hasOwnProperty("fullDocument")); @@ -166,10 +181,10 @@ // Test that looking up the post image of an update after the collection has been dropped and // created again will result in 'fullDocument' with a value of null. This must be done using // getMore because new cursors cannot be established after a collection drop. - // + // Insert a document with the same _id, verify the change stream won't return it due to // different UUID. - assert.commandWorked(db.createCollection(coll.getName())); + assertCreateCollection(db, coll.getName()); assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); // Confirm that the next entry's post-image is null since new collection has a different UUID. @@ -179,19 +194,18 @@ assert.eq(latestChange.fullDocument, null); // Test that invalidate entries don't have 'fullDocument' even if 'updateLookup' is specified. - db.collInvalidate.drop(); - assert.commandWorked(db.createCollection(db.collInvalidate.getName())); + const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate"); cursor = cst.startWatchingChanges({ - collection: db.collInvalidate, + collection: collInvalidate.getName(), pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], aggregateOptions: {cursor: {batchSize: 0}} }); - assert.writeOK(db.collInvalidate.insert({_id: "testing invalidate"})); - db.collInvalidate.drop(); + assert.writeOK(collInvalidate.insert({_id: "testing invalidate"})); + assertDropCollection(db, collInvalidate.getName()); // Wait until two-phase drop finishes. assert.soon(function() { return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( - db, db.collInvalidate.getName()); + db, collInvalidate.getName()); }); latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); @@ -202,9 +216,7 @@ // TODO(russotto): Can just use "coll" here once read majority is working. // For now, using the old collection results in us reading stale data sometimes. jsTestLog("Testing full document lookup with a real getMore"); - const coll2 = db.real_get_more; - coll2.drop(); - assert.commandWorked(db.createCollection(coll2.getName())); + const coll2 = assertDropAndRecreateCollection(db, "real_get_more"); assert.writeOK(coll2.insert({_id: "getMoreEnabled"})); cursor = cst.startWatchingChanges({ diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js index f7d909f039f..84f4d0ae979 100644 --- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js +++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js @@ -4,7 +4,8 @@ "use strict"; load('jstests/libs/uuid_util.js'); - load("jstests/libs/fixture_helpers.js"); // For 'FixtureHelpers'. + load("jstests/libs/fixture_helpers.js"); // For 'FixtureHelpers'. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. /** * Uses a parallel shell to execute the javascript function 'event' at the same time as an @@ -20,11 +21,13 @@ // the getMore to run. To prevent this from happening, the main thread waits for an insert // into "sentinel", to signal that the parallel shell has started and is waiting for the // getMore to appear in currentOp. - const shellSentinelCollection = db.shell_sentinel; - shellSentinelCollection.drop(); + const shellSentinelCollection = assertDropAndRecreateCollection(db, "shell_sentinel"); - const awaitShellDoingEventDuringGetMore = - startParallelShell(` + const port = + (collection.stats().sharded ? collection.getMongo().port + : FixtureHelpers.getPrimaryForNodeHostingDatabase(db).port); + + const awaitShellDoingEventDuringGetMore = startParallelShell(` // Signal that the parallel shell has started. assert.writeOK(db.getCollection("${ shellSentinelCollection.getName() }").insert({})); @@ -34,12 +37,12 @@ assert.soon(function() { op: "getmore", "command.collection": "${collection.getName()}", "originatingCommand.comment": "${identifyingComment}", - }).inprog.length === 1; + }).inprog.length > 0; }); const eventFn = ${ event.toString() }; eventFn();`, - FixtureHelpers.getPrimaryForNodeHostingDatabase(db).port); + port); // Wait for the shell to start. assert.soon(() => shellSentinelCollection.findOne() != null); @@ -104,9 +107,7 @@ eventFn();`, return result; } - const changesCollection = db.changes; - changesCollection.drop(); - assert.commandWorked(db.createCollection(changesCollection.getName())); + const changesCollection = assertDropAndRecreateCollection(db, "changes"); // Start a change stream cursor. const wholeCollectionStreamComment = "change stream on entire collection"; @@ -130,7 +131,6 @@ eventFn();`, event: () => assert.writeOK(db.changes.insert({_id: "wake up"})) }); assert.eq(getMoreResponse.cursor.nextBatch.length, 1); - const changesCollectionUuid = getUUIDFromListCollections(db, changesCollection.getName()); assert.docEq(getMoreResponse.cursor.nextBatch[0], { documentKey: {_id: "wake up"}, fullDocument: {_id: "wake up"}, @@ -140,7 +140,7 @@ eventFn();`, // Test that an insert to an unrelated collection will not cause the change stream to wake up // and return an empty batch before reaching the maxTimeMS. - db.unrelated_collection.drop(); + assertDropCollection(db, "unrelated_collection"); assertEventDoesNotWakeCursor({ collection: changesCollection, awaitDataCursorId: changeCursorId, diff --git a/jstests/change_streams/report_latest_observed_oplog_timestamp.js b/jstests/change_streams/report_latest_observed_oplog_timestamp.js index 6c5b92f0392..e75c50794a1 100644 --- a/jstests/change_streams/report_latest_observed_oplog_timestamp.js +++ b/jstests/change_streams/report_latest_observed_oplog_timestamp.js @@ -4,18 +4,16 @@ (function() { "use strict"; - const testName = "report_latest_observed_oplog_timestamp"; - const cursorCollection = db.getCollection(testName); - const otherCollection = db.getCollection("unrelated_" + testName); + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - // Drop collections to assure a clean run. Collections may not exist so do not check result. - cursorCollection.drop(); - otherCollection.drop(); + // Drop and recreate collections to assure a clean run. + const testName = "report_latest_observed_oplog_timestamp"; + const cursorCollection = assertDropAndRecreateCollection(db, testName); + const otherCollection = assertDropAndRecreateCollection(db, "unrelated_" + testName); // Get a resume point. jsTestLog("Getting a resume point."); const batchSize = 2; - assert.commandWorked(db.createCollection(cursorCollection.getName())); const firstResponse = assert.commandWorked(cursorCollection.runCommand( {aggregate: testName, pipeline: [{$changeStream: {}}], cursor: {batchSize: batchSize}})); assert.eq(0, firstResponse.cursor.firstBatch.length); diff --git a/jstests/change_streams/required_as_first_stage.js b/jstests/change_streams/required_as_first_stage.js index 3d5bbb1ea6c..d84d7e4c127 100644 --- a/jstests/change_streams/required_as_first_stage.js +++ b/jstests/change_streams/required_as_first_stage.js @@ -1,10 +1,11 @@ // Tests that the $changeStream stage can only be present as the first stage in the pipeline. (function() { "use strict"; - load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. - const coll = db.change_stream_required_as_first_stage; - coll.drop(); + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + const coll = assertDropAndRecreateCollection(db, "change_stream_required_as_first_stage"); assertErrorCode(coll, [{$indexStats: {}}, {$changeStream: {}}], 40602); assertErrorCode( diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index cbaf69fdedf..aea8aa3a1a5 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -30,10 +30,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { pipeline.push(self.oplogProjection); } + // The 'collection' argument may be either a collection name or DBCollection object. + assert(collection instanceof DBCollection || typeof collection === "string"); + const collName = (collection instanceof DBCollection ? collection.getName() : collection); + let res = assert.commandWorked(_db.runCommand( - Object.merge({aggregate: collection.getName(), pipeline: pipeline}, aggregateOptions))); + Object.merge({aggregate: collName, pipeline: pipeline}, aggregateOptions))); assert.neq(res.cursor.id, 0); - _allCursors.push({db: _db.getName(), coll: collection.getName(), cursorId: res.cursor.id}); + _allCursors.push({db: _db.getName(), coll: collName, cursorId: res.cursor.id}); return res.cursor; }; @@ -41,10 +45,9 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * Issues a 'getMore' on the provided cursor and returns the cursor returned. */ self.getNextBatch = function(cursor) { - let collection = _db.getMongo().getCollection(cursor.ns); + const collName = cursor.ns.split(/\.(.+)/)[1]; return assert - .commandWorked(_db.runCommand( - {getMore: cursor.id, collection: collection.getName(), batchSize: 1})) + .commandWorked(_db.runCommand({getMore: cursor.id, collection: collName, batchSize: 1})) .cursor; }; diff --git a/jstests/libs/collection_drop_recreate.js b/jstests/libs/collection_drop_recreate.js new file mode 100644 index 00000000000..9d2888ce6d8 --- /dev/null +++ b/jstests/libs/collection_drop_recreate.js @@ -0,0 +1,29 @@ +/** + * Attempts to drop the given collection and asserts either that the drop succeeded or the + * collection did not exist. Avoids automatically recreating the collection in the case of test + * suites where accessing or dropping the collection implicitly recreates it. + */ +function assertDropCollection(db, collName) { + var cmdRes = db.runCommand({drop: collName}); + assert(cmdRes.ok === 1 || cmdRes.code === ErrorCodes.NamespaceNotFound, tojson(cmdRes)); +} + +/** + * Attempts to create a collection with the given name and options, if any, and asserts on failure. + * Returns the newly-created collection on success. When running under a sharded collections + * passthrough, the new collection will be implicitly sharded. + */ +function assertCreateCollection(db, collName, collOpts) { + assert.commandWorked(db.createCollection(collName, collOpts)); + return db.getCollection(collName); +} + +/** + * Attempts to drop a collection with the given name and recreate it with the specified options, if + * any. Asserts if either step fails. Returns the newly-created collection on success. When running + * under a sharded collections passthrough, the new collection will be implicitly sharded. + */ +function assertDropAndRecreateCollection(db, collName, collOpts) { + assertDropCollection(db, collName); + return assertCreateCollection(db, collName, collOpts); +}
\ No newline at end of file diff --git a/jstests/libs/override_methods/implicitly_shard_accessed_collections.js b/jstests/libs/override_methods/implicitly_shard_accessed_collections.js index 3f1f8b24a55..e8ebf477167 100644 --- a/jstests/libs/override_methods/implicitly_shard_accessed_collections.js +++ b/jstests/libs/override_methods/implicitly_shard_accessed_collections.js @@ -45,7 +45,8 @@ assert.commandWorked(res, "enabling sharding on the '" + dbName + "' db failed"); } - res = db.adminCommand({shardCollection: fullName, key: {_id: 'hashed'}}); + res = db.adminCommand( + {shardCollection: fullName, key: {_id: 'hashed'}, collation: {locale: "simple"}}); if (res.ok === 0 && testMayRunDropInParallel) { // We ignore ConflictingOperationInProgress error responses from the // "shardCollection" command if it's possible the test was running a "drop" command @@ -64,9 +65,10 @@ DB.prototype.getCollection = function() { var collection = originalGetCollection.apply(this, arguments); - // If the collection exists, there must have been a previous call to getCollection - // where we sharded the collection so there's no need to do it again. - if (collection.exists()) { + const collStats = this.runCommand({collStats: collection.getName()}); + + // If the collection is already sharded or is non-empty, do not attempt to shard. + if (collStats.sharded || collStats.count > 0) { return collection; } diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js index 8ef40760efa..f417f40060d 100644 --- a/src/mongo/shell/db.js +++ b/src/mongo/shell/db.js @@ -325,10 +325,8 @@ var DB; var options = opt || {}; // We have special handling for the 'flags' field, and provide sugar for specific flags. If - // the - // user specifies any flags we send the field in the command. Otherwise, we leave it blank - // and - // use the server's defaults. + // the user specifies any flags we send the field in the command. Otherwise, we leave it + // blank and use the server's defaults. var sendFlags = false; var flags = 0; if (options.usePowerOf2Sizes != undefined) { |