summaryrefslogtreecommitdiff
path: root/jstests/serverless/change_streams
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-09-15 10:27:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 11:29:18 +0000
commite6b184b48b2f4ceaff580c98c24e14eac26e2c03 (patch)
tree27410d5d07867ef6be3026cb69a9a9821e03e254 /jstests/serverless/change_streams
parent0797ff28efcd7cb954b88658425b7b38c980b605 (diff)
downloadmongo-e6b184b48b2f4ceaff580c98c24e14eac26e2c03.tar.gz
SERVER-66641 Introduce multi-tenancy for change collections.
Diffstat (limited to 'jstests/serverless/change_streams')
-rw-r--r--jstests/serverless/change_streams/basic_read_from_change_collection.js125
-rw-r--r--jstests/serverless/change_streams/multitenant_read_from_change_collection.js158
2 files changed, 230 insertions, 53 deletions
diff --git a/jstests/serverless/change_streams/basic_read_from_change_collection.js b/jstests/serverless/change_streams/basic_read_from_change_collection.js
index cfe3ab53b88..98679d18c31 100644
--- a/jstests/serverless/change_streams/basic_read_from_change_collection.js
+++ b/jstests/serverless/change_streams/basic_read_from_change_collection.js
@@ -1,63 +1,82 @@
// Tests that a change stream can be opened on a change collection when one exists, and that an
// exception is thrown if we attempt to open a stream while change streams are disabled.
// @tags: [
-// featureFlagMongoStore,
-// requires_fcv_61,
+// requires_fcv_62,
// assumes_against_mongod_not_mongos,
// ]
(function() {
"use strict";
-(function runInReplicaSet() {
- const replSetTest = new ReplSetTest({nodes: 1});
-
- // TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
- // 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
- replSetTest.startSet(
- {setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})}});
-
- replSetTest.initiate();
-
- const connection = replSetTest.getPrimary();
-
- // Enable change stream such that it creates the change collection.
- // TODO SERVER-65950 pass tenant id to the command.
- assert.commandWorked(
- connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
- assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1}))
- .enabled,
- true);
-
- // Insert a document to the 'stockPrice' collection.
- const testDb = connection.getDB("test");
- const csCursor = connection.getDB("test").stockPrice.watch([]);
- testDb.stockPrice.insert({_id: "mdb", price: 250});
- testDb.stockPrice.insert({_id: "tsla", price: 650});
-
- // Verify that the change stream observes the required event.
- assert.soon(() => csCursor.hasNext());
- const event1 = csCursor.next();
- assert.eq(event1.documentKey._id, "mdb");
- assert.soon(() => csCursor.hasNext());
- const event2 = csCursor.next();
- assert.eq(event2.documentKey._id, "tsla");
-
- // Disable the change stream while the change stream cursor is still opened.
- // TODO SERVER-65950 pass tenant id to the command.
- assert.commandWorked(
- connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: false}));
- assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1}))
- .enabled,
- false);
-
- // Verify that the cursor throws 'QueryPlanKilled' exception on doing get next.
- assert.throwsWithCode(() => assert.soon(() => csCursor.hasNext()), ErrorCodes.QueryPlanKilled);
-
- // Open a new change stream cursor with change stream disabled state and verify that
- // 'ChangeStreamNotEnabled' exception is thrown.
- assert.throwsWithCode(() => testDb.stock.watch([]), ErrorCodes.ChangeStreamNotEnabled);
-
- replSetTest.stopSet();
-})();
+// For ChangeStreamMultitenantReplicaSetTest.
+load("jstests/serverless/libs/change_collection_util.js");
+// For assertDropAndRecreateCollection.
+load("jstests/libs/collection_drop_recreate.js");
+
+// TODO SERVER-69115 Change to a 2-node replica set.
+const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1});
+const primary = replSetTest.getPrimary();
+
+// Hard code tenants id such that the tenant can be identified deterministically.
+const tenantId = ObjectId("6303b6bb84305d2266d0b779");
+
+// Connection to the replica set primary that is stamped with the tenant id.
+const tenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, tenantId);
+
+// Verify that the change stream observes expected events.
+function verifyChangeEvents(csCursor, expectedEvents) {
+ for (const [expectedOpType, expectedDoc] of expectedEvents) {
+ assert.soon(() => csCursor.hasNext());
+ const event = csCursor.next();
+
+ assert.eq(event.operationType, expectedOpType, event);
+ if (event.operationType == "insert") {
+ assert.eq(event.fullDocument, expectedDoc);
+ } else if (event.operationType == "drop") {
+ assert.soon(() => csCursor.hasNext());
+ assert.eq(csCursor.isClosed(), true);
+ }
+ }
+}
+
+// Enable change stream for the first tenant.
+replSetTest.setChangeStreamState(tenantConn, true);
+
+// Open the change stream cursor.
+const testDb = tenantConn.getDB("test");
+const csCursor = testDb.stockPrice.watch([]);
+
+// Insert documents to the 'stockPrice' collection.
+const docs = [{_id: "mdb", price: 250}, {_id: "tsla", price: 650}];
+docs.forEach(doc => assert.commandWorked(testDb.stockPrice.insert(doc)));
+
+// Drop the stock price collection to invalidate the change stream cursor.
+assert(testDb.stockPrice.drop());
+
+// Verify that the change stream observes the required event.
+verifyChangeEvents(csCursor, [["insert", docs[0]], ["insert", docs[1]], ["drop", []]]);
+
+// Disable and then enable the change stream.
+replSetTest.setChangeStreamState(tenantConn, false);
+replSetTest.setChangeStreamState(tenantConn, true);
+
+// Add a new document to the 'stockPrice' collection and verify that re-enabling the change
+// stream works correctly.
+const newCsCursor = testDb.stockPrice.watch([]);
+const newDocs = [{_id: "goog", price: 2000}];
+newDocs.forEach(doc => assert.commandWorked(testDb.stockPrice.insert(doc)));
+verifyChangeEvents(newCsCursor, [["insert", newDocs[0]]]);
+
+// Disable the change stream while the change stream cursor is still opened.
+replSetTest.setChangeStreamState(tenantConn, false);
+
+// Verify that the cursor throws 'QueryPlanKilled' exception on doing get next.
+assert.throwsWithCode(() => assert.soon(() => newCsCursor.hasNext()), ErrorCodes.QueryPlanKilled);
+
+// Open a new change stream cursor with change stream disabled state and verify that
+// 'ChangeStreamNotEnabled' exception is thrown.
+assert.throwsWithCode(() => testDb.stock.watch([]), ErrorCodes.ChangeStreamNotEnabled);
+
+replSetTest.stopSet();
}());
diff --git a/jstests/serverless/change_streams/multitenant_read_from_change_collection.js b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js
new file mode 100644
index 00000000000..b10941b7999
--- /dev/null
+++ b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js
@@ -0,0 +1,158 @@
+// Tests the behaviour of change streams on change collections in an environment with more than one
+// active tenant.
+// @tags: [
+// requires_fcv_62,
+// assumes_against_mongod_not_mongos,
+// ]
+
+(function() {
+"use strict";
+
+// For ChangeStreamMultitenantReplicaSetTest.
+load("jstests/serverless/libs/change_collection_util.js");
+// For assertDropAndRecreateCollection.
+load("jstests/libs/collection_drop_recreate.js");
+
+// TODO SERVER-69115 Change to a 2-node replica set.
+const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1});
+const primary = replSetTest.getPrimary();
+
+// Hard code tenants ids such that a particular tenant can be identified deterministically.
+const firstTenantId = ObjectId("6303b6bb84305d2266d0b779");
+const secondTenantId = ObjectId("7303b6bb84305d2266d0b779");
+
+// Connections to the replica set primary that are stamped with their respective tenant ids.
+const firstTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId);
+const secondTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId);
+
+// Verify that the change stream observes expected events. The method also collects resume tokens
+// for each expected change collection and returns those on successful assertion.
+function verifyEventsAndGetResumeTokens(csCursor, expectedEvents) {
+ let resumeTokens = [];
+
+ for (const [expectedOpType, expectedDoc] of expectedEvents) {
+ assert.soon(() => csCursor.hasNext());
+ const event = csCursor.next();
+
+ assert.eq(event.operationType, expectedOpType, event);
+ if (event.operationType == "insert") {
+ assert.eq(event.fullDocument, expectedDoc);
+ } else if (event.operationType == "drop") {
+ assert.soon(() => csCursor.hasNext());
+ assert.eq(csCursor.isClosed(), true);
+ }
+
+ resumeTokens.push(csCursor.getResumeToken());
+ }
+
+ return resumeTokens;
+}
+
+// Get the 'test' db for both tenants.
+const firstTenantTestDb = firstTenantConn.getDB("test");
+const secondTenantTestDb = secondTenantConn.getDB("test");
+
+// Recreate the 'stockPrice' collection to delete any old documents.
+assertDropAndRecreateCollection(firstTenantTestDb, "stockPrice");
+assertDropAndRecreateCollection(secondTenantTestDb, "stockPrice");
+
+// Create a new incarnation of the change collection for the first tenant.
+replSetTest.setChangeStreamState(firstTenantConn, false);
+replSetTest.setChangeStreamState(firstTenantConn, true);
+
+// These documents will be inserted in tenants 'stockPrice' collections.
+const firstTenantDocs =
+ [{_id: "mdb", price: 350}, {_id: "goog", price: 2000}, {_id: "nflx", price: 220}];
+const secondTenantDocs =
+ [{_id: "amzn", price: 3000}, {_id: "tsla", price: 750}, {_id: "aapl", price: 160}];
+
+// Open the change stream cursor for the first tenant.
+const firstTenantCsCursor = firstTenantTestDb.stockPrice.watch([]);
+
+// Fetch the latest timestamp before enabling the change stream for the second tenant.
+const startAtOperationTime =
+ primary.getDB("local").oplog.rs.find().sort({ts: -1}).limit(1).next().ts;
+assert(startAtOperationTime !== undefined);
+
+// Now create the change collection for the second tenant. The oplog timestamp associated with the
+// second tenant's create change collection will be greater than the 'startAtOperationTime'.
+replSetTest.setChangeStreamState(secondTenantConn, false);
+replSetTest.setChangeStreamState(secondTenantConn, true);
+
+// Open the change stream cursor for the second tenant.
+const secondTenantCsCursor = secondTenantTestDb.stockPrice.watch([]);
+
+// Insert documents to both change collections in jumbled fashion.
+assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[0]));
+assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[0]));
+assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[1]));
+assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[1]));
+assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[2]));
+assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[2]));
+
+// Verify that each change stream emits only the required tenant's change events and that there
+// is no leak of events amongst the change streams. Do not consume all events for the first
+// tenant as it will be consumed later.
+const firstTenantResumeTokens = verifyEventsAndGetResumeTokens(
+ firstTenantCsCursor, [["insert", firstTenantDocs[0]], ["insert", firstTenantDocs[1]]]);
+const secondTenantResumeTokens = verifyEventsAndGetResumeTokens(secondTenantCsCursor, [
+ ["insert", secondTenantDocs[0]],
+ ["insert", secondTenantDocs[1]],
+ ["insert", secondTenantDocs[2]]
+]);
+
+// Verify that change streams from both tenants can be resumed using their respective resume token.
+verifyEventsAndGetResumeTokens(
+ firstTenantTestDb.stockPrice.watch([], {resumeAfter: firstTenantResumeTokens[0]}),
+ [["insert", firstTenantDocs[1]], ["insert", firstTenantDocs[2]]]);
+verifyEventsAndGetResumeTokens(
+ secondTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}),
+ [["insert", secondTenantDocs[1]], ["insert", secondTenantDocs[2]]]);
+
+// Verify that resume tokens cannot be exchanged between tenants change streams.
+assert.throwsWithCode(
+ () => secondTenantTestDb.stockPrice.watch([], {resumeAfter: firstTenantResumeTokens[0]}),
+ ErrorCodes.ChangeStreamFatalError);
+assert.throwsWithCode(
+ () => firstTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}),
+ ErrorCodes.ChangeStreamFatalError);
+
+// Verify that the first tenant's change stream can be resumed using the timestamp
+// 'startAtOperationTime'.
+verifyEventsAndGetResumeTokens(
+ firstTenantTestDb.stockPrice.watch([], {startAtOperationTime: startAtOperationTime}), [
+ ["insert", firstTenantDocs[0]],
+ ["insert", firstTenantDocs[1]],
+ ["insert", firstTenantDocs[2]]
+ ]);
+
+// Verify that the second tenant's change stream cannot be resumed with the timestamp
+// 'startAtOperationTime' and should throw change stream history lost.
+assert.throwsWithCode(
+ () => secondTenantTestDb.stockPrice.watch([], {startAtOperationTime: startAtOperationTime}),
+ ErrorCodes.ChangeStreamHistoryLost);
+
+// Ensure that disabling the change stream for the second tenant does not impact the change
+// stream of the first tenant.
+replSetTest.setChangeStreamState(secondTenantConn, false);
+
+// The next on the change stream for the second tenant should now throw exception.
+assert.throwsWithCode(() => assert.soon(() => secondTenantCsCursor.hasNext()),
+ ErrorCodes.QueryPlanKilled);
+
+// The next of the change stream for the first tenant should continue to work. Since we have
+// still not consumed all event from the first tenant, the change stream should emit the
+// remaining ones.
+verifyEventsAndGetResumeTokens(firstTenantCsCursor, [["insert", firstTenantDocs[2]]]);
+
+// Re-enable the change stream for the second tenant and verify that the change stream cannot be
+// resumed using the resume token of previous incarnation of the change stream.
+replSetTest.setChangeStreamState(secondTenantConn, true);
+assert.throwsWithCode(
+ () => secondTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}),
+ ErrorCodes.ChangeStreamHistoryLost);
+
+replSetTest.stopSet();
+}());