diff options
author | Rishab Joshi <rishab.joshi@mongodb.com> | 2022-09-15 10:27:24 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-15 11:29:18 +0000 |
commit | e6b184b48b2f4ceaff580c98c24e14eac26e2c03 (patch) | |
tree | 27410d5d07867ef6be3026cb69a9a9821e03e254 /jstests/serverless/change_streams | |
parent | 0797ff28efcd7cb954b88658425b7b38c980b605 (diff) | |
download | mongo-e6b184b48b2f4ceaff580c98c24e14eac26e2c03.tar.gz |
SERVER-66641 Introduce multi-tenancy for change collections.
Diffstat (limited to 'jstests/serverless/change_streams')
-rw-r--r-- | jstests/serverless/change_streams/basic_read_from_change_collection.js | 125 | ||||
-rw-r--r-- | jstests/serverless/change_streams/multitenant_read_from_change_collection.js | 158 |
2 files changed, 230 insertions, 53 deletions
diff --git a/jstests/serverless/change_streams/basic_read_from_change_collection.js b/jstests/serverless/change_streams/basic_read_from_change_collection.js index cfe3ab53b88..98679d18c31 100644 --- a/jstests/serverless/change_streams/basic_read_from_change_collection.js +++ b/jstests/serverless/change_streams/basic_read_from_change_collection.js @@ -1,63 +1,82 @@ // Tests that a change stream can be opened on a change collection when one exists, and that an // exception is thrown if we attempt to open a stream while change streams are disabled. // @tags: [ -// featureFlagMongoStore, -// requires_fcv_61, +// requires_fcv_62, // assumes_against_mongod_not_mongos, // ] (function() { "use strict"; -(function runInReplicaSet() { - const replSetTest = new ReplSetTest({nodes: 1}); - - // TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and - // 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. - replSetTest.startSet( - {setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})}}); - - replSetTest.initiate(); - - const connection = replSetTest.getPrimary(); - - // Enable change stream such that it creates the change collection. - // TODO SERVER-65950 pass tenant id to the command. - assert.commandWorked( - connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true})); - assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1})) - .enabled, - true); - - // Insert a document to the 'stockPrice' collection. - const testDb = connection.getDB("test"); - const csCursor = connection.getDB("test").stockPrice.watch([]); - testDb.stockPrice.insert({_id: "mdb", price: 250}); - testDb.stockPrice.insert({_id: "tsla", price: 650}); - - // Verify that the change stream observes the required event. - assert.soon(() => csCursor.hasNext()); - const event1 = csCursor.next(); - assert.eq(event1.documentKey._id, "mdb"); - assert.soon(() => csCursor.hasNext()); - const event2 = csCursor.next(); - assert.eq(event2.documentKey._id, "tsla"); - - // Disable the change stream while the change stream cursor is still opened. - // TODO SERVER-65950 pass tenant id to the command. - assert.commandWorked( - connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: false})); - assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1})) - .enabled, - false); - - // Verify that the cursor throws 'QueryPlanKilled' exception on doing get next. - assert.throwsWithCode(() => assert.soon(() => csCursor.hasNext()), ErrorCodes.QueryPlanKilled); - - // Open a new change stream cursor with change stream disabled state and verify that - // 'ChangeStreamNotEnabled' exception is thrown. - assert.throwsWithCode(() => testDb.stock.watch([]), ErrorCodes.ChangeStreamNotEnabled); - - replSetTest.stopSet(); -})(); +// For ChangeStreamMultitenantReplicaSetTest. +load("jstests/serverless/libs/change_collection_util.js"); +// For assertDropAndRecreateCollection. +load("jstests/libs/collection_drop_recreate.js"); + +// TODO SERVER-69115 Change to a 2-node replica set. +const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1}); +const primary = replSetTest.getPrimary(); + +// Hard code tenants id such that the tenant can be identified deterministically. +const tenantId = ObjectId("6303b6bb84305d2266d0b779"); + +// Connection to the replica set primary that is stamped with the tenant id. +const tenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, tenantId); + +// Verify that the change stream observes expected events. +function verifyChangeEvents(csCursor, expectedEvents) { + for (const [expectedOpType, expectedDoc] of expectedEvents) { + assert.soon(() => csCursor.hasNext()); + const event = csCursor.next(); + + assert.eq(event.operationType, expectedOpType, event); + if (event.operationType == "insert") { + assert.eq(event.fullDocument, expectedDoc); + } else if (event.operationType == "drop") { + assert.soon(() => csCursor.hasNext()); + assert.eq(csCursor.isClosed(), true); + } + } +} + +// Enable change stream for the first tenant. +replSetTest.setChangeStreamState(tenantConn, true); + +// Open the change stream cursor. +const testDb = tenantConn.getDB("test"); +const csCursor = testDb.stockPrice.watch([]); + +// Insert documents to the 'stockPrice' collection. +const docs = [{_id: "mdb", price: 250}, {_id: "tsla", price: 650}]; +docs.forEach(doc => assert.commandWorked(testDb.stockPrice.insert(doc))); + +// Drop the stock price collection to invalidate the change stream cursor. +assert(testDb.stockPrice.drop()); + +// Verify that the change stream observes the required event. +verifyChangeEvents(csCursor, [["insert", docs[0]], ["insert", docs[1]], ["drop", []]]); + +// Disable and then enable the change stream. +replSetTest.setChangeStreamState(tenantConn, false); +replSetTest.setChangeStreamState(tenantConn, true); + +// Add a new document to the 'stockPrice' collection and verify that re-enabling the change +// stream works correctly. +const newCsCursor = testDb.stockPrice.watch([]); +const newDocs = [{_id: "goog", price: 2000}]; +newDocs.forEach(doc => assert.commandWorked(testDb.stockPrice.insert(doc))); +verifyChangeEvents(newCsCursor, [["insert", newDocs[0]]]); + +// Disable the change stream while the change stream cursor is still opened. +replSetTest.setChangeStreamState(tenantConn, false); + +// Verify that the cursor throws 'QueryPlanKilled' exception on doing get next. +assert.throwsWithCode(() => assert.soon(() => newCsCursor.hasNext()), ErrorCodes.QueryPlanKilled); + +// Open a new change stream cursor with change stream disabled state and verify that +// 'ChangeStreamNotEnabled' exception is thrown. +assert.throwsWithCode(() => testDb.stock.watch([]), ErrorCodes.ChangeStreamNotEnabled); + +replSetTest.stopSet(); }()); diff --git a/jstests/serverless/change_streams/multitenant_read_from_change_collection.js b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js new file mode 100644 index 00000000000..b10941b7999 --- /dev/null +++ b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js @@ -0,0 +1,158 @@ +// Tests the behaviour of change streams on change collections in an environment with more than one +// active tenant. +// @tags: [ +// requires_fcv_62, +// assumes_against_mongod_not_mongos, +// ] + +(function() { +"use strict"; + +// For ChangeStreamMultitenantReplicaSetTest. +load("jstests/serverless/libs/change_collection_util.js"); +// For assertDropAndRecreateCollection. +load("jstests/libs/collection_drop_recreate.js"); + +// TODO SERVER-69115 Change to a 2-node replica set. +const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1}); +const primary = replSetTest.getPrimary(); + +// Hard code tenants ids such that a particular tenant can be identified deterministically. +const firstTenantId = ObjectId("6303b6bb84305d2266d0b779"); +const secondTenantId = ObjectId("7303b6bb84305d2266d0b779"); + +// Connections to the replica set primary that are stamped with their respective tenant ids. +const firstTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId); +const secondTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId); + +// Verify that the change stream observes expected events. The method also collects resume tokens +// for each expected change collection and returns those on successful assertion. +function verifyEventsAndGetResumeTokens(csCursor, expectedEvents) { + let resumeTokens = []; + + for (const [expectedOpType, expectedDoc] of expectedEvents) { + assert.soon(() => csCursor.hasNext()); + const event = csCursor.next(); + + assert.eq(event.operationType, expectedOpType, event); + if (event.operationType == "insert") { + assert.eq(event.fullDocument, expectedDoc); + } else if (event.operationType == "drop") { + assert.soon(() => csCursor.hasNext()); + assert.eq(csCursor.isClosed(), true); + } + + resumeTokens.push(csCursor.getResumeToken()); + } + + return resumeTokens; +} + +// Get the 'test' db for both tenants. +const firstTenantTestDb = firstTenantConn.getDB("test"); +const secondTenantTestDb = secondTenantConn.getDB("test"); + +// Recreate the 'stockPrice' collection to delete any old documents. +assertDropAndRecreateCollection(firstTenantTestDb, "stockPrice"); +assertDropAndRecreateCollection(secondTenantTestDb, "stockPrice"); + +// Create a new incarnation of the change collection for the first tenant. +replSetTest.setChangeStreamState(firstTenantConn, false); +replSetTest.setChangeStreamState(firstTenantConn, true); + +// These documents will be inserted in tenants 'stockPrice' collections. +const firstTenantDocs = + [{_id: "mdb", price: 350}, {_id: "goog", price: 2000}, {_id: "nflx", price: 220}]; +const secondTenantDocs = + [{_id: "amzn", price: 3000}, {_id: "tsla", price: 750}, {_id: "aapl", price: 160}]; + +// Open the change stream cursor for the first tenant. +const firstTenantCsCursor = firstTenantTestDb.stockPrice.watch([]); + +// Fetch the latest timestamp before enabling the change stream for the second tenant. +const startAtOperationTime = + primary.getDB("local").oplog.rs.find().sort({ts: -1}).limit(1).next().ts; +assert(startAtOperationTime !== undefined); + +// Now create the change collection for the second tenant. The oplog timestamp associated with the +// second tenant's create change collection will be greater than the 'startAtOperationTime'. +replSetTest.setChangeStreamState(secondTenantConn, false); +replSetTest.setChangeStreamState(secondTenantConn, true); + +// Open the change stream cursor for the second tenant. +const secondTenantCsCursor = secondTenantTestDb.stockPrice.watch([]); + +// Insert documents to both change collections in jumbled fashion. +assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[0])); +assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[0])); +assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[1])); +assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[1])); +assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[2])); +assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[2])); + +// Verify that each change stream emits only the required tenant's change events and that there +// is no leak of events amongst the change streams. Do not consume all events for the first +// tenant as it will be consumed later. +const firstTenantResumeTokens = verifyEventsAndGetResumeTokens( + firstTenantCsCursor, [["insert", firstTenantDocs[0]], ["insert", firstTenantDocs[1]]]); +const secondTenantResumeTokens = verifyEventsAndGetResumeTokens(secondTenantCsCursor, [ + ["insert", secondTenantDocs[0]], + ["insert", secondTenantDocs[1]], + ["insert", secondTenantDocs[2]] +]); + +// Verify that change streams from both tenants can be resumed using their respective resume token. +verifyEventsAndGetResumeTokens( + firstTenantTestDb.stockPrice.watch([], {resumeAfter: firstTenantResumeTokens[0]}), + [["insert", firstTenantDocs[1]], ["insert", firstTenantDocs[2]]]); +verifyEventsAndGetResumeTokens( + secondTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}), + [["insert", secondTenantDocs[1]], ["insert", secondTenantDocs[2]]]); + +// Verify that resume tokens cannot be exchanged between tenants change streams. +assert.throwsWithCode( + () => secondTenantTestDb.stockPrice.watch([], {resumeAfter: firstTenantResumeTokens[0]}), + ErrorCodes.ChangeStreamFatalError); +assert.throwsWithCode( + () => firstTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}), + ErrorCodes.ChangeStreamFatalError); + +// Verify that the first tenant's change stream can be resumed using the timestamp +// 'startAtOperationTime'. +verifyEventsAndGetResumeTokens( + firstTenantTestDb.stockPrice.watch([], {startAtOperationTime: startAtOperationTime}), [ + ["insert", firstTenantDocs[0]], + ["insert", firstTenantDocs[1]], + ["insert", firstTenantDocs[2]] + ]); + +// Verify that the second tenant's change stream cannot be resumed with the timestamp +// 'startAtOperationTime' and should throw change stream history lost. +assert.throwsWithCode( + () => secondTenantTestDb.stockPrice.watch([], {startAtOperationTime: startAtOperationTime}), + ErrorCodes.ChangeStreamHistoryLost); + +// Ensure that disabling the change stream for the second tenant does not impact the change +// stream of the first tenant. +replSetTest.setChangeStreamState(secondTenantConn, false); + +// The next on the change stream for the second tenant should now throw exception. +assert.throwsWithCode(() => assert.soon(() => secondTenantCsCursor.hasNext()), + ErrorCodes.QueryPlanKilled); + +// The next of the change stream for the first tenant should continue to work. Since we have +// still not consumed all event from the first tenant, the change stream should emit the +// remaining ones. +verifyEventsAndGetResumeTokens(firstTenantCsCursor, [["insert", firstTenantDocs[2]]]); + +// Re-enable the change stream for the second tenant and verify that the change stream cannot be +// resumed using the resume token of previous incarnation of the change stream. +replSetTest.setChangeStreamState(secondTenantConn, true); +assert.throwsWithCode( + () => secondTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}), + ErrorCodes.ChangeStreamHistoryLost); + +replSetTest.stopSet(); +}()); |