summaryrefslogtreecommitdiff
path: root/jstests/serverless/libs/change_collection_util.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/serverless/libs/change_collection_util.js')
-rw-r--r--jstests/serverless/libs/change_collection_util.js127
1 files changed, 104 insertions, 23 deletions
diff --git a/jstests/serverless/libs/change_collection_util.js b/jstests/serverless/libs/change_collection_util.js
index 4026ea84f81..f9b8a9c6846 100644
--- a/jstests/serverless/libs/change_collection_util.js
+++ b/jstests/serverless/libs/change_collection_util.js
@@ -1,29 +1,44 @@
// Contains functions for testing the change collections.
-// Verifies that the oplog and change collection entries are the same for the specified start and
-// end duration of the oplog timestamp.
-function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) {
+// Verifies that the oplog and change collection entries are the same for the provided tenant
+// 'tenantId' for the specified timestamp window:- (startOplogTimestamp, endOplogTimestamp].
+function verifyChangeCollectionEntries(
+ connection, startOplogTimestamp, endOplogTimestamp, tenantId) {
+ // Fetch the oplog documents for the provided tenant for the specified timestamp window. Note
+ // that the startOplogTimestamp is expected to be just before the first write, while the
+ // endOplogTimestamp is expected to be the timestamp of the final write in the test.
const oplogColl = connection.getDB("local").oplog.rs;
- const changeColl = connection.getDB("config").system.change_collection;
+ const oplogEntries = oplogColl
+ .find({
+ $and: [
+ {ts: {$gt: startOplogTimestamp}},
+ {ts: {$lte: endOplogTimestamp}},
+ {tid: tenantId}
+ ]
+ })
+ .toArray();
- // Fetch all oplog and change collection entries for the duration: [startOplogTimestamp,
- // endOplogTimestamp].
- const oplogEntries =
- oplogColl.find({$and: [{ts: {$gte: startOplogTimestamp}}, {ts: {$lte: endOplogTimestamp}}]})
- .toArray();
+ // Fetch all documents from the tenant's change collection for the specified timestamp window.
+ const changeColl =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(connection.host, tenantId)
+ .getDB("config")
+ .system.change_collection;
const changeCollectionEntries =
changeColl
- .find({$and: [{_id: {$gte: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]})
+ .find({$and: [{_id: {$gt: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]})
.toArray();
- assert.eq(
- oplogEntries.length,
- changeCollectionEntries.length,
- "Number of entries in the oplog and the change collection is not the same. Oplog has total " +
- oplogEntries.length + " entries , change collection has total " +
- changeCollectionEntries.length + " entries" +
- "change collection entries " + tojson(changeCollectionEntries));
+ // Verify that the number of documents returned by the oplog and the tenant's change collection
+ // are exactly the same.
+ assert.eq(oplogEntries.length,
+ changeCollectionEntries.length,
+ "Number of entries in the oplog and the change collection with tenantId: " +
+ tenantId + " is not the same. Oplog has total " + oplogEntries.length +
+ " entries , change collection has total " + changeCollectionEntries.length +
+ " entries, change collection entries " + tojson(changeCollectionEntries));
+ // Verify that the documents in the change collection are exactly the same as the oplog for a
+ // particular tenant.
for (let idx = 0; idx < oplogEntries.length; idx++) {
const oplogEntry = oplogEntries[idx];
const changeCollectionEntry = changeCollectionEntries[idx];
@@ -32,16 +47,82 @@ function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplog
assert(changeCollectionEntry.hasOwnProperty("_id"));
assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts),
0,
- "Change collection '_id' field: " + tojson(changeCollectionEntry._id) +
+ "Change collection with tenantId: " + tenantId +
+ " '_id' field: " + tojson(changeCollectionEntry._id) +
" is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts));
delete changeCollectionEntry["_id"];
// Verify that the oplog and change collecton entry (after removing the '_id') field are
// the same.
- assert.eq(
- oplogEntry,
- changeCollectionEntry,
- "Oplog and change collection entries are not same. Oplog entry: " + tojson(oplogEntry) +
- ", change collection entry: " + tojson(changeCollectionEntry));
+ assert.eq(oplogEntry,
+ changeCollectionEntry,
+ "Oplog and change collection with tenantId: " + tenantId +
+ " entries are not same. Oplog entry: " + tojson(oplogEntry) +
+ ", change collection entry: " + tojson(changeCollectionEntry));
+ }
+}
+
+// A class that sets up the multitenant environment to enable change collections on the replica set.
+// This class also provides helpers that are commonly used when working with change collections.
+class ChangeStreamMultitenantReplicaSetTest extends ReplSetTest {
+ constructor(config) {
+ // Instantiate the 'ReplSetTest'.
+ super(config);
+
+ // Start and initialize the replica set.
+ // TODO SERVER-67267 Add 'serverless' flag.
+ const setParameter = Object.assign({}, config.setParameter || {}, {
+ featureFlagServerlessChangeStreams: true,
+ multitenancySupport: true,
+ featureFlagMongoStore: true,
+ featureFlagRequireTenantID: true
+ });
+ this.startSet({setParameter: setParameter});
+ this.initiate();
+
+ // Create a root user within the multitenant environment to enable passing '$tenant' to
+ // commands.
+ assert.commandWorked(this.getPrimary().getDB("admin").runCommand(
+ {createUser: "root", pwd: "pwd", roles: ["root"]}));
+ }
+
+ // Returns a connection to the 'hostAddr' with 'tenantId' stamped to it for the created user.
+ static getTenantConnection(hostAddr, tenantId, createUser = {
+ user: ObjectId().str,
+ roles: [{role: 'readWriteAnyDatabase', db: 'admin'}]
+ }) {
+ const tokenConn = new Mongo(hostAddr);
+
+ // Login to the root user with 'ActionType::useTenant' such that the '$tenant' can be
+ // used.
+ assert(tokenConn.getDB("admin").auth("root", "pwd"));
+
+ // Create the user with the provided attributes.
+ assert.commandWorked(tokenConn.getDB("$external").runCommand({
+ createUser: createUser.user,
+ '$tenant': tenantId,
+ roles: createUser.roles
+ }));
+
+ // Set the provided tenant id into the security token for the user.
+ tokenConn._setSecurityToken(
+ _createSecurityToken({user: createUser.user, db: '$external', tenant: tenantId}));
+
+ // Logout the root user to avoid multiple authentication.
+ tokenConn.getDB("admin").logout();
+
+ return tokenConn;
+ }
+
+ // Sets the change stream state for the provided tenant connection.
+ setChangeStreamState(tenantConn, enabled) {
+ assert.commandWorked(
+ tenantConn.getDB("admin").runCommand({setChangeStreamState: 1, enabled: enabled}));
+ }
+
+ // Returns the change stream state for the provided tenant connection.
+ getChangeStreamState(tenantConn) {
+ return assert.commandWorked(tenantConn.getDB("admin").runCommand({getChangeStreamState: 1}))
+ .enabled;
}
}