summaryrefslogtreecommitdiff
path: root/jstests/serverless/basic_write_to_change_collection.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/serverless/basic_write_to_change_collection.js')
-rw-r--r--jstests/serverless/basic_write_to_change_collection.js194
1 files changed, 147 insertions, 47 deletions
diff --git a/jstests/serverless/basic_write_to_change_collection.js b/jstests/serverless/basic_write_to_change_collection.js
index cbed0377517..e67b299fdac 100644
--- a/jstests/serverless/basic_write_to_change_collection.js
+++ b/jstests/serverless/basic_write_to_change_collection.js
@@ -2,82 +2,182 @@
// modification operations.
// @tags: [
// featureFlagMongoStore,
-// requires_fcv_61,
+// requires_fcv_62,
// ]
(function() {
"use strict";
-load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries.
+// For verifyChangeCollectionEntries and ChangeStreamMultitenantReplicaSetTest.
+load("jstests/serverless/libs/change_collection_util.js");
+// For funWithArgs.
+load('jstests/libs/parallel_shell_helpers.js');
-const replSetTest = new ReplSetTest({nodes: 2});
-
-// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
-// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
-replSetTest.startSet(
- {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})});
-
-replSetTest.initiate();
+// TODO SERVER-69115 Change to a 2-node replica set.
+const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1});
const primary = replSetTest.getPrimary();
const secondary = replSetTest.getSecondary();
+
const testDb = primary.getDB("test");
-// Enable the change stream to create the change collection.
-assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
+// Hard code tenants ids such that a particular tenant can be identified deterministically.
+const firstTenantId = ObjectId("6303b6bb84305d2266d0b779");
+const secondTenantId = ObjectId("7303b6bb84305d2266d0b779");
-// Performs writes on the specified collection.
-function performWrites(coll) {
- const docIds = [1, 2, 3, 4, 5];
+// Connections to the replica set primary that are stamped with their respective tenant ids.
+const firstTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId);
+const secondTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId);
+
+// Enable the change stream state such that change collections are created for both tenants.
+replSetTest.setChangeStreamState(firstTenantConn, true);
+replSetTest.setChangeStreamState(secondTenantConn, true);
+
+// Performs writes on the specified collection 'coll' such that the corresponding oplog entries are
+// captured by the tenant's change collection.
+function performWrites(coll, docIds) {
docIds.forEach(docId => assert.commandWorked(coll.insert({_id: docId})));
docIds.forEach(
docId => assert.commandWorked(coll.update({_id: docId}, {$set: {annotate: "updated"}})));
}
-// Test the change collection entries with the oplog by performing some basic writes.
-(function testBasicWritesInChangeCollection() {
+// Retrieve the last timestamp from the oplog.
+function getLatestTimestamp() {
const oplogColl = primary.getDB("local").oplog.rs;
- const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
- assert(startOplogTimestamp != undefined);
-
- performWrites(testDb.stock);
- assert(testDb.stock.drop());
+ const oplogTimestamp = oplogColl.find().sort({ts: -1}).limit(1).next().ts;
+ assert(oplogTimestamp !== undefined);
+ return oplogTimestamp;
+}
- const endOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
- assert(endOplogTimestamp !== undefined);
+// Test that writes to two different change collections are isolated and that each change collection
+// captures only the relevant oplog entries associated with the corresponding tenant.
+(function testWritesWithMultipleTenants() {
+ jsTestLog("Testing writes on change collections with multiple tenants.");
+
+ // A helper shell function to perform write for the specified 'tenantId'.
+ function shellFn(hostAddr, collName, tenantId, performWrites) {
+ load("jstests/serverless/libs/change_collection_util.js");
+
+ const tenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId);
+
+ const docIds = Array.from({length: 300}, (_, index) => index);
+ performWrites(tenantConn.getDB("test").getCollection(collName), docIds);
+
+ assert(tenantConn.getDB("test").getCollection(collName).drop());
+ }
+
+ const startOplogTimestamp = getLatestTimestamp();
+
+ // Perform writes for the first tenant in a different shell.
+ const firstTenantShellReturn =
+ startParallelShell(funWithArgs(shellFn,
+ primary.host,
+ "testWritesWithMultipleTenants_firstTenant",
+ firstTenantId,
+ performWrites),
+ primary.port);
+
+ // Perform writes to the second tenant parallely with the first tenant.
+ const secondTenantShellReturn =
+ startParallelShell(funWithArgs(shellFn,
+ primary.host,
+ "testWritesWithMultipleTenants_secondTenant",
+ secondTenantId,
+ performWrites),
+ primary.port);
+
+ // Wait for both shells to return.
+ firstTenantShellReturn();
+ secondTenantShellReturn();
+
+ const endOplogTimestamp = getLatestTimestamp();
assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);
- // Wait for the replication to finish.
- replSetTest.awaitReplication();
+ // Verify that both change collections captured their respective tenant's oplog entries in
+ // the primary.
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId);
- // Verify that the change collection entries are the same as the oplog in the primary and the
- // secondary node.
- verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp);
- verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp);
+ // TODO SERVER-69115 Uncomment this.
+ /**
+ //Wait for the replication to finish.
+ replSetTest.awaitReplication();
+ // Verify that both change collections captured their respective tenant's oplog entries in
+ // the secondary.
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp,
+ secondTenantId);
+ */
})();
-// Test the change collection entries with the oplog by performing writes in a transaction.
-(function testWritesinChangeCollectionWithTrasactions() {
- const oplogColl = primary.getDB("local").oplog.rs;
- const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
- assert(startOplogTimestamp != undefined);
+// Test that transactional writes to two different change collections are isolated and that each
+// change collection captures only the relevant 'applyOps' oplog entries associated with the
+// corresponding tenant.
+(function testTransactionalWritesWithMultipleTenants() {
+ jsTestLog("Testing transactional writes on change collections with multiple tenants.");
- const session = testDb.getMongo().startSession();
- const sessionDb = session.getDatabase(testDb.getName());
- session.startTransaction();
- performWrites(sessionDb.test);
- session.commitTransaction_forTesting();
+ // A helper shell function to perform transactional write for the specified 'tenantId'.
+ function shellFn(hostAddr, collName, tenantId, performWrites) {
+ load("jstests/serverless/libs/change_collection_util.js");
- const endOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
- assert(endOplogTimestamp != undefined);
+ const tenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId);
+
+ const session = tenantConn.getDB("test").getMongo().startSession();
+ const sessionDb = session.getDatabase("test");
+
+ session.startTransaction();
+
+ const docIds = Array.from({length: 300}, (_, index) => index);
+ performWrites(sessionDb.getCollection(collName), docIds);
+
+ session.commitTransaction_forTesting();
+ }
+
+ const startOplogTimestamp = getLatestTimestamp();
+
+ // Perform writes within a transaction for the first tenant.
+ const firstTenantShellReturn =
+ startParallelShell(funWithArgs(shellFn,
+ primary.host,
+ "testTransactionalWritesWithMultipleTenants_firstTenant",
+ firstTenantId,
+ performWrites),
+ primary.port);
+
+ // Perform parallel writes within a transaction for the second tenant.
+ const secondTenantShellReturn =
+ startParallelShell(funWithArgs(shellFn,
+ primary.host,
+ "testTransactionalWritesWithMultipleTenants_secondTenant",
+ secondTenantId,
+ performWrites),
+ primary.port);
+
+ // Wait for shells to return.
+ firstTenantShellReturn();
+ secondTenantShellReturn();
+
+ const endOplogTimestamp = getLatestTimestamp();
assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);
+ // Verify that both change collections captured their respective tenant's 'applyOps' oplog
+ // entries in the primary.
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId);
+
+ // TODO SERVER-69115 Uncomment this.
+ /**
// Wait for the replication to finish.
replSetTest.awaitReplication();
-
- // Verify that the change collection entries are the same as the oplog in the primary and the
- // secondary node for the applyOps.
- verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp);
- verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp);
+ // Verify that both change collections captured their respective tenant's 'applyOps' oplog
+ // entries in the secondary.
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp,
+ secondTenantId);
+ */
})();
replSetTest.stopSet();