summaryrefslogtreecommitdiff
path: root/jstests/serverless
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-08-11 17:00:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-11 18:34:34 +0000
commit7be22a4e693ed231f217c7670de076dc1960b238 (patch)
tree5e1b3e5901dd9f05e09cf85a6061d4aa3c3f34a9 /jstests/serverless
parentf387fa8cc75b9129fb1d57d302ed424349990d5e (diff)
downloadmongo-7be22a4e693ed231f217c7670de076dc1960b238.tar.gz
SERVER-66631 Implement command to enable and disable the change stream.
Diffstat (limited to 'jstests/serverless')
-rw-r--r--jstests/serverless/basic_write_to_change_collection.js3
-rw-r--r--jstests/serverless/change_stream_state_commands.js282
-rw-r--r--jstests/serverless/change_streams/basic_read_from_change_collection.js24
-rw-r--r--jstests/serverless/initial_sync_change_collection.js4
-rw-r--r--jstests/serverless/write_to_change_collection_in_startup_recovery.js3
5 files changed, 307 insertions, 9 deletions
diff --git a/jstests/serverless/basic_write_to_change_collection.js b/jstests/serverless/basic_write_to_change_collection.js
index 401c8916880..cbed0377517 100644
--- a/jstests/serverless/basic_write_to_change_collection.js
+++ b/jstests/serverless/basic_write_to_change_collection.js
@@ -22,6 +22,9 @@ const primary = replSetTest.getPrimary();
const secondary = replSetTest.getSecondary();
const testDb = primary.getDB("test");
+// Enable the change stream to create the change collection.
+assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
+
// Performs writes on the specified collection.
function performWrites(coll) {
const docIds = [1, 2, 3, 4, 5];
diff --git a/jstests/serverless/change_stream_state_commands.js b/jstests/serverless/change_stream_state_commands.js
new file mode 100644
index 00000000000..fe5d38a2a67
--- /dev/null
+++ b/jstests/serverless/change_stream_state_commands.js
@@ -0,0 +1,282 @@
+// Test that the 'setChangeStreamState' and 'getChangeStreamState' commands work as expected in the
+// multi-tenant replica sets environment for various cases.
+// @tags: [
+// featureFlagMongoStore,
+// requires_fcv_61,
+// ]
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
+load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries.
+load('jstests/libs/parallel_shell_helpers.js'); // For funWithArgs.
+
+const replSetTest = new ReplSetTest({nodes: 2});
+
+// TODO SERVER-67267 Add 'featureFlagServerlessChangeStreams' and 'serverless' flags and remove
+// 'failpoint.forceEnableChangeCollectionsMode'.
+replSetTest.startSet({
+ setParameter: {
+ "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}),
+ multitenancySupport: true
+ }
+});
+
+replSetTest.initiate();
+
+// Sets the change stream state for the provided tenant id.
+function setChangeStreamState(tenantId, enabled) {
+ assert.commandWorked(replSetTest.getPrimary().getDB("admin").runCommand(
+ {setChangeStreamState: 1, $tenant: tenantId, enabled: enabled}));
+}
+
+// Verifies that the required change stream state is set for the provided tenant id both in the
+// primary and the secondary and the command 'getChangeStreamState' returns the same state.
+function assertChangeStreamState(tenantId, enabled) {
+ assert.eq(assert
+ .commandWorked(replSetTest.getPrimary().getDB("admin").runCommand(
+ {getChangeStreamState: 1, $tenant: tenantId}))
+ .enabled,
+ enabled);
+
+ const primaryColls = replSetTest.getPrimary().getDB("config").getCollectionNames();
+ const secondaryColls = replSetTest.getSecondary().getDB("config").getCollectionNames();
+
+ // Verify that the change collection exists both in the primary and the secondary.
+ assert.eq(primaryColls.includes("system.change_collection"), enabled);
+ assert.eq(secondaryColls.includes("system.change_collection"), enabled);
+
+ // Verify that the pre-images collection exists both in the primary and the secondary.
+ assert.eq(primaryColls.includes("system.preimages"), enabled);
+ assert.eq(secondaryColls.includes("system.preimages"), enabled);
+}
+
+const firstOrgTenantId = ObjectId();
+const secondOrgTenantId = ObjectId();
+
+// Tests that the 'setChangeStreamState' command works for the basic cases.
+(function basicTest() {
+ jsTestLog("Running basic tests");
+
+ // Verify that the 'setChangeStreamState' command cannot be run with db other than the 'admin'
+ // db.
+ assert.commandFailedWithCode(
+ replSetTest.getPrimary().getDB("config").runCommand(
+ {setChangeStreamState: 1, enabled: true, $tenant: firstOrgTenantId}),
+ ErrorCodes.Unauthorized);
+
+ // Verify that the 'getChangeStreamState' command cannot be run with db other than the 'admin'
+ // db.
+ assert.commandFailedWithCode(replSetTest.getPrimary().getDB("config").runCommand(
+ {getChangeStreamState: 1, $tenant: firstOrgTenantId}),
+ ErrorCodes.Unauthorized);
+
+ // Verify that the change stream is enabled for the tenant.
+ setChangeStreamState(firstOrgTenantId, true);
+ assertChangeStreamState(firstOrgTenantId, true);
+
+ // Verify that the change stream is disabled for the tenant.
+ setChangeStreamState(firstOrgTenantId, false);
+ assertChangeStreamState(firstOrgTenantId, false);
+
+ // Verify that enabling change stream multiple times has not side-effects.
+ setChangeStreamState(firstOrgTenantId, true);
+ setChangeStreamState(firstOrgTenantId, true);
+ assertChangeStreamState(firstOrgTenantId, true);
+
+ // Verify that disabling change stream multiple times has not side-effects.
+ setChangeStreamState(firstOrgTenantId, false);
+ setChangeStreamState(firstOrgTenantId, false);
+ assertChangeStreamState(firstOrgTenantId, false);
+})();
+
+// Tests that the 'setChangeStreamState' command tolerates the primary step-down and can
+// successfully resume after the new primary comes up.
+(function resumabilityTest() {
+ jsTestLog("Verifying resumability");
+
+ // Reset the change stream state to disabled before starting the test case.
+ setChangeStreamState(firstOrgTenantId, false);
+ assertChangeStreamState(firstOrgTenantId, false);
+
+ const primary = replSetTest.getPrimary();
+ const secondary = replSetTest.getSecondary();
+
+ // Hang the 'SetChangeStreamStateCoordinator' before processing the command request.
+ const fpHangBeforeCmdProcessor =
+ configureFailPoint(primary, "hangSetChangeStreamStateCoordinatorBeforeCommandProcessor");
+
+ // While the failpoint is active, issue a request to enable change stream. This command will
+ // hang at the fail point.
+ const shellReturn = startParallelShell(() => {
+ db.getSiblingDB("admin").runCommand({setChangeStreamState: 1, enabled: true});
+ }, primary.port);
+
+ // Wait until the fail point is hit.
+ fpHangBeforeCmdProcessor.wait();
+
+ // Verify that the change stream is still disabled at this point.
+ assertChangeStreamState(firstOrgTenantId, false);
+
+ // Force primary to step down such that the secondary gets elected as a new leader.
+ assert.commandWorked(primary.adminCommand({replSetStepDown: 60, force: true}));
+
+ // The hung command at the point must have been interrupted and shell must have returned the
+ // error code.
+ shellReturn();
+
+ // Wait until the secondary becomes the new primary.
+ replSetTest.waitForState(secondary, ReplSetTest.State.PRIMARY);
+
+ // Disable the fail point as it is no longer needed.
+ fpHangBeforeCmdProcessor.off();
+
+ // Get the new primary and the secondary.
+ const newPrimary = replSetTest.getPrimary();
+
+ // Verify that the new primary resumed the command and change stream is now enabled.
+ assert.soon(() => {
+ const collNames = newPrimary.getDB("config").getCollectionNames();
+ return collNames.includes("system.change_collection") &&
+ collNames.includes("system.preimages");
+ });
+ assertChangeStreamState(firstOrgTenantId, true);
+})();
+
+// Tests that the 'setChangeStreamState' command does not allow parallel non-identical requests from
+// the same tenant.
+(function parallelNonIdenticalRequestsSameTenantTest() {
+ jsTestLog("Verifying parallel non-identical requests from the same tenant");
+
+ // Reset the change stream state to disabled before starting the test case.
+ setChangeStreamState(firstOrgTenantId, false);
+ assertChangeStreamState(firstOrgTenantId, false);
+
+ const primary = replSetTest.getPrimary();
+
+ // Hang the 'SetChangeStreamStateCoordinator' before processing the command request.
+ const fpHangBeforeCmdProcessor =
+ configureFailPoint(primary, "hangSetChangeStreamStateCoordinatorBeforeCommandProcessor");
+
+ // While the failpoint is active, issue a request to enable change stream for the tenant. This
+ // command will hang at the fail point.
+ const shellReturn = startParallelShell(
+ funWithArgs((firstOrgTenantId) => {
+ assert.commandWorked(db.getSiblingDB("admin").runCommand(
+ {setChangeStreamState: 1, $tenant: firstOrgTenantId, enabled: true}));
+ }, firstOrgTenantId), primary.port);
+
+ // Wait until the fail point is hit.
+ fpHangBeforeCmdProcessor.wait();
+
+ // While the first command is still hung, issue a request to disable the change stream for the
+ // same tenants. This request should bail out with 'ConflictingOperationInProgress' exception.
+ assert.throwsWithCode(() => setChangeStreamState(firstOrgTenantId, false),
+ ErrorCodes.ConflictingOperationInProgress);
+
+ // Turn off the fail point.
+ fpHangBeforeCmdProcessor.off();
+
+ // Wait for the shell to return.
+ shellReturn();
+
+ // Verify that the first command has enabled the change stream now.
+ assertChangeStreamState(firstOrgTenantId, true);
+})();
+
+// Tests that the 'setChangeStreamState' command allows parallel identical requests from the same
+// tenant.
+(function parallelIdenticalRequestsSameTenantTest() {
+ jsTestLog("Verifying parallel identical requests from the same tenant");
+
+ // Reset the change stream state to disabled before starting the test case.
+ setChangeStreamState(firstOrgTenantId, false);
+ assertChangeStreamState(firstOrgTenantId, false);
+
+ const primary = replSetTest.getPrimary();
+
+ // Hang the 'SetChangeStreamStateCoordinator' before processing the command request.
+ const fpHangBeforeCmdProcessor =
+ configureFailPoint(primary, "hangSetChangeStreamStateCoordinatorBeforeCommandProcessor");
+
+ const shellFn = (firstOrgTenantId) => {
+ assert.commandWorked(db.getSiblingDB("admin").runCommand(
+ {setChangeStreamState: 1, $tenant: firstOrgTenantId, enabled: true}));
+ };
+
+ // While the failpoint is active, issue a request to enable change stream for the tenant. This
+ // command will hang at the fail point.
+ const shellReturn1 = startParallelShell(funWithArgs(shellFn, firstOrgTenantId), primary.port);
+
+ // Wait for the fail point to be hit.
+ fpHangBeforeCmdProcessor.wait();
+
+ // Issue another request to enable the change stream from the same tenant. This should not throw
+ // any exception. We will not wait for the fail point because the execution of the same request
+ // is already in progress and this request will wait on the completion of the previous
+ // enablement request.
+ const shellReturn2 = startParallelShell(funWithArgs(shellFn, firstOrgTenantId), primary.port);
+
+ // Turn off the fail point.
+ fpHangBeforeCmdProcessor.off();
+
+ // Wait for shells to return.
+ shellReturn1();
+ shellReturn2();
+
+ // Verify that the first command has enabled the change stream now.
+ assertChangeStreamState(firstOrgTenantId, true);
+})();
+
+// Tests that parallel requests from different tenants do not interfere with each other and can
+// complete successfully.
+(function parallelRequestsDifferentTenantsTest() {
+ jsTestLog("Verifying parallel requests from different tenants");
+
+ // Reset the change stream state to disable before starting the test case.
+ setChangeStreamState(firstOrgTenantId, false);
+ assertChangeStreamState(firstOrgTenantId, false);
+ setChangeStreamState(secondOrgTenantId, false);
+ assertChangeStreamState(secondOrgTenantId, false);
+
+ const primary = replSetTest.getPrimary();
+
+ // Hang the 'SetChangeStreamStateCoordinator' before processing the command request.
+ const fpHangBeforeCmdProcessor =
+ configureFailPoint(primary, "hangSetChangeStreamStateCoordinatorBeforeCommandProcessor");
+
+ // Enable the change stream for the tenant 'firstOrgTenantId' in parallel.
+ const firstTenantShellReturn = startParallelShell(
+ funWithArgs((firstOrgTenantId) => {
+ assert.commandWorked(db.getSiblingDB("admin").runCommand(
+ {setChangeStreamState: 1, $tenant: firstOrgTenantId, enabled: true}));
+ }, firstOrgTenantId), primary.port);
+
+ // Wait until the above request hits the fail point.
+ fpHangBeforeCmdProcessor.wait({timesEntered: 1});
+
+ // While the first request from the tenant 'firstOrgTenantId' is hung, issue another request but
+ // with the tenant 'secondOrgTenantId'.
+ const secondTenantShellReturn = startParallelShell(
+ funWithArgs((secondOrgTenantId) => {
+ assert.commandWorked(db.getSiblingDB("admin").runCommand(
+ {setChangeStreamState: 1, $tenant: secondOrgTenantId, enabled: true}));
+ }, secondOrgTenantId), primary.port);
+
+ // The request from the 'secondOrgTenantId' will also hang.
+ fpHangBeforeCmdProcessor.wait({timesEntered: 2});
+
+ // Now that both the request have hit the fail point, disable it.
+ fpHangBeforeCmdProcessor.off();
+
+ // Wait for both shells to return.
+ firstTenantShellReturn();
+ secondTenantShellReturn();
+
+ // Verify that the change stream state for both tenants is now enabled.
+ assertChangeStreamState(firstOrgTenantId, true);
+ assertChangeStreamState(secondOrgTenantId, true);
+})();
+
+replSetTest.stopSet();
+}());
diff --git a/jstests/serverless/change_streams/basic_read_from_change_collection.js b/jstests/serverless/change_streams/basic_read_from_change_collection.js
index 6c2edc1da4d..cfe3ab53b88 100644
--- a/jstests/serverless/change_streams/basic_read_from_change_collection.js
+++ b/jstests/serverless/change_streams/basic_read_from_change_collection.js
@@ -9,25 +9,26 @@
(function() {
"use strict";
-// TODO SERVER-66631 replace this with change stream disablement command. Extend the test cases for
-// enablement/disablement combinations.
-function disableChangeStream(connection) {
- const configDB = connection.getDB("config");
- assert(configDB.system.change_collection.drop());
-}
-
(function runInReplicaSet() {
const replSetTest = new ReplSetTest({nodes: 1});
// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
replSetTest.startSet(
- {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})});
+ {setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})}});
replSetTest.initiate();
const connection = replSetTest.getPrimary();
+ // Enable change stream such that it creates the change collection.
+ // TODO SERVER-65950 pass tenant id to the command.
+ assert.commandWorked(
+ connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
+ assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1}))
+ .enabled,
+ true);
+
// Insert a document to the 'stockPrice' collection.
const testDb = connection.getDB("test");
const csCursor = connection.getDB("test").stockPrice.watch([]);
@@ -43,7 +44,12 @@ function disableChangeStream(connection) {
assert.eq(event2.documentKey._id, "tsla");
// Disable the change stream while the change stream cursor is still opened.
- disableChangeStream(connection);
+ // TODO SERVER-65950 pass tenant id to the command.
+ assert.commandWorked(
+ connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: false}));
+ assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1}))
+ .enabled,
+ false);
// Verify that the cursor throws 'QueryPlanKilled' exception on doing get next.
assert.throwsWithCode(() => assert.soon(() => csCursor.hasNext()), ErrorCodes.QueryPlanKilled);
diff --git a/jstests/serverless/initial_sync_change_collection.js b/jstests/serverless/initial_sync_change_collection.js
index 81dd18b3a93..8910c742a50 100644
--- a/jstests/serverless/initial_sync_change_collection.js
+++ b/jstests/serverless/initial_sync_change_collection.js
@@ -23,6 +23,10 @@ replSetTest.startSet(
replSetTest.initiate();
const primary = replSetTest.getPrimary();
+
+// Enable the change stream to create the change collection.
+assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
+
const primaryChangeColl = primary.getDB("config").system.change_collection;
const mdbStockPriceDoc = {
diff --git a/jstests/serverless/write_to_change_collection_in_startup_recovery.js b/jstests/serverless/write_to_change_collection_in_startup_recovery.js
index a14b5e28600..b0950abd057 100644
--- a/jstests/serverless/write_to_change_collection_in_startup_recovery.js
+++ b/jstests/serverless/write_to_change_collection_in_startup_recovery.js
@@ -23,6 +23,9 @@ replSetTest.initiate();
let primary = replSetTest.getPrimary();
+// Enable the change stream to create the change collection.
+assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
+
// Insert a document to the collection and then capture the corresponding oplog timestamp. This
// timestamp will be the start timestamp beyond (inclusive) which we will validate the oplog and the
// change collection entries.