diff options
Diffstat (limited to 'jstests/serverless/basic_write_to_change_collection.js')
-rw-r--r-- | jstests/serverless/basic_write_to_change_collection.js | 194 |
1 files changed, 147 insertions, 47 deletions
diff --git a/jstests/serverless/basic_write_to_change_collection.js b/jstests/serverless/basic_write_to_change_collection.js index cbed0377517..e67b299fdac 100644 --- a/jstests/serverless/basic_write_to_change_collection.js +++ b/jstests/serverless/basic_write_to_change_collection.js @@ -2,82 +2,182 @@ // modification operations. // @tags: [ // featureFlagMongoStore, -// requires_fcv_61, +// requires_fcv_62, // ] (function() { "use strict"; -load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries. +// For verifyChangeCollectionEntries and ChangeStreamMultitenantReplicaSetTest. +load("jstests/serverless/libs/change_collection_util.js"); +// For funWithArgs. +load('jstests/libs/parallel_shell_helpers.js'); -const replSetTest = new ReplSetTest({nodes: 2}); - -// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and -// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. -replSetTest.startSet( - {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})}); - -replSetTest.initiate(); +// TODO SERVER-69115 Change to a 2-node replica set. +const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1}); const primary = replSetTest.getPrimary(); const secondary = replSetTest.getSecondary(); + const testDb = primary.getDB("test"); -// Enable the change stream to create the change collection. -assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true})); +// Hard code tenants ids such that a particular tenant can be identified deterministically. +const firstTenantId = ObjectId("6303b6bb84305d2266d0b779"); +const secondTenantId = ObjectId("7303b6bb84305d2266d0b779"); -// Performs writes on the specified collection. -function performWrites(coll) { - const docIds = [1, 2, 3, 4, 5]; +// Connections to the replica set primary that are stamped with their respective tenant ids. +const firstTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId); +const secondTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId); + +// Enable the change stream state such that change collections are created for both tenants. +replSetTest.setChangeStreamState(firstTenantConn, true); +replSetTest.setChangeStreamState(secondTenantConn, true); + +// Performs writes on the specified collection 'coll' such that the corresponding oplog entries are +// captured by the tenant's change collection. +function performWrites(coll, docIds) { docIds.forEach(docId => assert.commandWorked(coll.insert({_id: docId}))); docIds.forEach( docId => assert.commandWorked(coll.update({_id: docId}, {$set: {annotate: "updated"}}))); } -// Test the change collection entries with the oplog by performing some basic writes. -(function testBasicWritesInChangeCollection() { +// Retrieve the last timestamp from the oplog. +function getLatestTimestamp() { const oplogColl = primary.getDB("local").oplog.rs; - const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts; - assert(startOplogTimestamp != undefined); - - performWrites(testDb.stock); - assert(testDb.stock.drop()); + const oplogTimestamp = oplogColl.find().sort({ts: -1}).limit(1).next().ts; + assert(oplogTimestamp !== undefined); + return oplogTimestamp; +} - const endOplogTimestamp = oplogColl.find().toArray().at(-1).ts; - assert(endOplogTimestamp !== undefined); +// Test that writes to two different change collections are isolated and that each change collection +// captures only the relevant oplog entries associated with the corresponding tenant. +(function testWritesWithMultipleTenants() { + jsTestLog("Testing writes on change collections with multiple tenants."); + + // A helper shell function to perform write for the specified 'tenantId'. + function shellFn(hostAddr, collName, tenantId, performWrites) { + load("jstests/serverless/libs/change_collection_util.js"); + + const tenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId); + + const docIds = Array.from({length: 300}, (_, index) => index); + performWrites(tenantConn.getDB("test").getCollection(collName), docIds); + + assert(tenantConn.getDB("test").getCollection(collName).drop()); + } + + const startOplogTimestamp = getLatestTimestamp(); + + // Perform writes for the first tenant in a different shell. + const firstTenantShellReturn = + startParallelShell(funWithArgs(shellFn, + primary.host, + "testWritesWithMultipleTenants_firstTenant", + firstTenantId, + performWrites), + primary.port); + + // Perform writes to the second tenant parallely with the first tenant. + const secondTenantShellReturn = + startParallelShell(funWithArgs(shellFn, + primary.host, + "testWritesWithMultipleTenants_secondTenant", + secondTenantId, + performWrites), + primary.port); + + // Wait for both shells to return. + firstTenantShellReturn(); + secondTenantShellReturn(); + + const endOplogTimestamp = getLatestTimestamp(); assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0); - // Wait for the replication to finish. - replSetTest.awaitReplication(); + // Verify that both change collections captured their respective tenant's oplog entries in + // the primary. + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId); + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId); - // Verify that the change collection entries are the same as the oplog in the primary and the - // secondary node. - verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp); - verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp); + // TODO SERVER-69115 Uncomment this. + /** + //Wait for the replication to finish. + replSetTest.awaitReplication(); + // Verify that both change collections captured their respective tenant's oplog entries in + // the secondary. + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId); + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, + secondTenantId); + */ })(); -// Test the change collection entries with the oplog by performing writes in a transaction. -(function testWritesinChangeCollectionWithTrasactions() { - const oplogColl = primary.getDB("local").oplog.rs; - const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts; - assert(startOplogTimestamp != undefined); +// Test that transactional writes to two different change collections are isolated and that each +// change collection captures only the relevant 'applyOps' oplog entries associated with the +// corresponding tenant. +(function testTransactionalWritesWithMultipleTenants() { + jsTestLog("Testing transactional writes on change collections with multiple tenants."); - const session = testDb.getMongo().startSession(); - const sessionDb = session.getDatabase(testDb.getName()); - session.startTransaction(); - performWrites(sessionDb.test); - session.commitTransaction_forTesting(); + // A helper shell function to perform transactional write for the specified 'tenantId'. + function shellFn(hostAddr, collName, tenantId, performWrites) { + load("jstests/serverless/libs/change_collection_util.js"); - const endOplogTimestamp = oplogColl.find().toArray().at(-1).ts; - assert(endOplogTimestamp != undefined); + const tenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId); + + const session = tenantConn.getDB("test").getMongo().startSession(); + const sessionDb = session.getDatabase("test"); + + session.startTransaction(); + + const docIds = Array.from({length: 300}, (_, index) => index); + performWrites(sessionDb.getCollection(collName), docIds); + + session.commitTransaction_forTesting(); + } + + const startOplogTimestamp = getLatestTimestamp(); + + // Perform writes within a transaction for the first tenant. + const firstTenantShellReturn = + startParallelShell(funWithArgs(shellFn, + primary.host, + "testTransactionalWritesWithMultipleTenants_firstTenant", + firstTenantId, + performWrites), + primary.port); + + // Perform parallel writes within a transaction for the second tenant. + const secondTenantShellReturn = + startParallelShell(funWithArgs(shellFn, + primary.host, + "testTransactionalWritesWithMultipleTenants_secondTenant", + secondTenantId, + performWrites), + primary.port); + + // Wait for shells to return. + firstTenantShellReturn(); + secondTenantShellReturn(); + + const endOplogTimestamp = getLatestTimestamp(); assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0); + // Verify that both change collections captured their respective tenant's 'applyOps' oplog + // entries in the primary. + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId); + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId); + + // TODO SERVER-69115 Uncomment this. + /** // Wait for the replication to finish. replSetTest.awaitReplication(); - - // Verify that the change collection entries are the same as the oplog in the primary and the - // secondary node for the applyOps. - verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp); - verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp); + // Verify that both change collections captured their respective tenant's 'applyOps' oplog + // entries in the secondary. + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId); + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, + secondTenantId); + */ })(); replSetTest.stopSet(); |