summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRishab Joshi <rishab.joshi@mongodb.com>2022-09-15 10:27:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 11:29:18 +0000
commite6b184b48b2f4ceaff580c98c24e14eac26e2c03 (patch)
tree27410d5d07867ef6be3026cb69a9a9821e03e254
parent0797ff28efcd7cb954b88658425b7b38c980b605 (diff)
downloadmongo-e6b184b48b2f4ceaff580c98c24e14eac26e2c03.tar.gz
SERVER-66641 Introduce multi-tenancy for change collections.
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml9
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml4
-rw-r--r--buildscripts/resmokelib/testing/hooks/enable_change_stream.py2
-rw-r--r--etc/evergreen.yml2
-rw-r--r--jstests/auth/change_stream_change_collection_role_auth.js10
-rw-r--r--jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js17
-rw-r--r--jstests/serverless/basic_write_to_change_collection.js194
-rw-r--r--jstests/serverless/change_collection_expired_document_remover.js305
-rw-r--r--jstests/serverless/change_collection_server_stats.js41
-rw-r--r--jstests/serverless/change_stream_state_commands.js19
-rw-r--r--jstests/serverless/change_streams/basic_read_from_change_collection.js125
-rw-r--r--jstests/serverless/change_streams/multitenant_read_from_change_collection.js158
-rw-r--r--jstests/serverless/initial_sync_change_collection.js23
-rw-r--r--jstests/serverless/libs/change_collection_util.js127
-rw-r--r--jstests/serverless/sharded_change_stream_state_commands.js6
-rw-r--r--jstests/serverless/write_to_change_collection_in_startup_recovery.js26
-rw-r--r--src/mongo/db/SConscript22
-rw-r--r--src/mongo/db/catalog/drop_collection.cpp4
-rw-r--r--src/mongo/db/change_collection_expired_change_remover_test.cpp18
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.cpp29
-rw-r--r--src/mongo/db/change_collection_expired_documents_remover.h3
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp113
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h21
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.cpp26
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.h7
-rw-r--r--src/mongo/db/change_stream_serverless_helpers.cpp101
-rw-r--r--src/mongo/db/change_stream_serverless_helpers.h71
-rw-r--r--src/mongo/db/change_streams_cluster_parameter.idl4
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/change_stream_state_command.cpp46
-rw-r--r--src/mongo/db/commands/dbcommands.cpp4
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp18
-rw-r--r--src/mongo/db/commands/set_cluster_parameter_command.cpp4
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp8
-rw-r--r--src/mongo/db/mongod_main.cpp6
-rw-r--r--src/mongo/db/namespace_string.cpp12
-rw-r--r--src/mongo/db/namespace_string.h8
-rw-r--r--src/mongo/db/op_observer/op_observer_impl_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp4
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp2
-rw-r--r--src/mongo/db/query/query_knobs.idl9
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/oplog.cpp3
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp6
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp3
-rw-r--r--src/mongo/db/set_change_stream_state_coordinator.cpp50
-rw-r--r--src/mongo/db/stats/SConscript1
-rw-r--r--src/mongo/db/stats/change_collection_server_status.cpp3
49 files changed, 1201 insertions, 491 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
index 867a6acc7f1..16a0397b526 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml
@@ -30,7 +30,10 @@ executor:
enableMajorityReadConcern: ''
# Enable causal consistency for change streams suites using 1 node replica sets. See
# change_streams.yml for detailed explanation.
- #TODO SERVER-67267 load 'inject_tenant_prefix.js'.
+ # TODO SERVER-67267 Load 'inject_tenant_prefix.js', add 'multitenancySupport',
+ # 'featureFlagMongoStore' and 'featureFlagRequireTenantID' flags. The
+ # 'inject_tenant_prefix' should also be called for the
+ # 'enable_change_stream.py' file.
eval: >-
var testingReplication = true;
load('jstests/libs/override_methods/set_read_and_write_concerns.js');
@@ -52,6 +55,8 @@ executor:
set_parameters:
enableTestCommands: 1
featureFlagServerlessChangeStreams: true
+ internalChangeStreamUseTenantIdForTesting: true
failpoint.assertChangeStreamNssCollection:
"{'mode': 'alwaysOn', 'data': {'collectionName': 'system.change_collection'}}"
- num_nodes: 2
+ # TODO SERVER-69115 Set number of nodes to 2 after oplog application is working properly.
+ num_nodes: 1
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
index 804045fde38..9a9ef01d75d 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
@@ -63,7 +63,7 @@ executor:
shard_options:
mongod_options:
set_parameters:
- # TODO SERVER-67267 check if the feature flag can be removed.
- failpoint.forceEnableChangeCollectionsMode: "{mode: 'alwaysOn'}"
+ featureFlagServerlessChangeStreams: true
+ internalChangeStreamUseTenantIdForTesting: true
failpoint.assertChangeStreamNssCollection:
"{'mode': 'alwaysOn', 'data': {'collectionName': 'system.change_collection'}}"
diff --git a/buildscripts/resmokelib/testing/hooks/enable_change_stream.py b/buildscripts/resmokelib/testing/hooks/enable_change_stream.py
index 501648e73b8..cafefff9370 100644
--- a/buildscripts/resmokelib/testing/hooks/enable_change_stream.py
+++ b/buildscripts/resmokelib/testing/hooks/enable_change_stream.py
@@ -48,7 +48,7 @@ class EnableChangeStream(interface.Hook):
@staticmethod
def _set_change_stream_state(connection, enabled):
- # TODO SERVER-65950 create change collection for the tenant.
+ # TODO SERVER-67267 Enable command overrides to use security token.
client = connection.get_primary().mongo_client()
client.get_database("admin").command({"setChangeStreamState": 1, "enabled": enabled})
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index d1f7f1cc5ff..740fbc61c8e 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -1417,7 +1417,7 @@ buildvariants:
- name: change_streams_multitenant_passthrough
- name: change_streams_multitenant_sharded_collections_passthrough
distros:
- - rhel80-medium # TODO SERVER-65950 remove the distro.
+ - rhel80-medium # TODO SERVER-67543 remove the distro.
- name: .misc_js
- name: .clustered_collections
- name: .concurrency !.large !.ubsan !.no_txns !.debug_only
diff --git a/jstests/auth/change_stream_change_collection_role_auth.js b/jstests/auth/change_stream_change_collection_role_auth.js
index 1eecbfebd02..60b86a8d823 100644
--- a/jstests/auth/change_stream_change_collection_role_auth.js
+++ b/jstests/auth/change_stream_change_collection_role_auth.js
@@ -160,11 +160,15 @@ function removeChangeCollectionDoc(authDb) {
const replSetTest =
new ReplSetTest({name: "shard", nodes: 1, useHostName: true, waitForKeys: false});
-// TODO SERVER-67267: Add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
-// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
+// TODO SERVER-67267: Add 'serverless' flags.
replSetTest.startSet({
keyFile: keyFile,
- setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})}
+ setParameter: {
+ featureFlagServerlessChangeStreams: true,
+ multitenancySupport: true,
+ featureFlagMongoStore: true,
+ internalChangeStreamUseTenantIdForTesting: true,
+ }
});
replSetTest.initiate();
const primary = replSetTest.getPrimary();
diff --git a/jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js b/jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js
index a086fbc3005..c9b713bda0c 100644
--- a/jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js
+++ b/jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js
@@ -22,20 +22,13 @@ ReplSetTest = function(opts) {
let setParameter = {};
- // The 'setParameter' that should be merged with 'newOpts' for the sharded-cluster and the
- // replica-set.
- if (newOptions.hasOwnProperty("shardsvr")) {
- setParameter = {
- // TODO SERVER-67267 check if 'forceEnableChangeCollectionsMode' can be removed.
- "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}),
- "failpoint.assertChangeStreamNssCollection": fpAssertChangeStreamNssColl
- };
- } else if (!newOptions.hasOwnProperty("configsvr")) {
- // This is the case for the replica-set. A change collection does not exist in the
- // config server.
+ // A change collection does not exist in the config server, do not set any change collection
+ // related parameter.
+ if (!newOptions.hasOwnProperty("configsvr")) {
setParameter = {
featureFlagServerlessChangeStreams: true,
- "failpoint.assertChangeStreamNssCollection": fpAssertChangeStreamNssColl
+ internalChangeStreamUseTenantIdForTesting: true,
+ "failpoint.assertChangeStreamNssCollection": fpAssertChangeStreamNssColl,
};
}
diff --git a/jstests/serverless/basic_write_to_change_collection.js b/jstests/serverless/basic_write_to_change_collection.js
index cbed0377517..e67b299fdac 100644
--- a/jstests/serverless/basic_write_to_change_collection.js
+++ b/jstests/serverless/basic_write_to_change_collection.js
@@ -2,82 +2,182 @@
// modification operations.
// @tags: [
// featureFlagMongoStore,
-// requires_fcv_61,
+// requires_fcv_62,
// ]
(function() {
"use strict";
-load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries.
+// For verifyChangeCollectionEntries and ChangeStreamMultitenantReplicaSetTest.
+load("jstests/serverless/libs/change_collection_util.js");
+// For funWithArgs.
+load('jstests/libs/parallel_shell_helpers.js');
-const replSetTest = new ReplSetTest({nodes: 2});
-
-// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
-// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
-replSetTest.startSet(
- {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})});
-
-replSetTest.initiate();
+// TODO SERVER-69115 Change to a 2-node replica set.
+const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1});
const primary = replSetTest.getPrimary();
const secondary = replSetTest.getSecondary();
+
const testDb = primary.getDB("test");
-// Enable the change stream to create the change collection.
-assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
+// Hard code tenants ids such that a particular tenant can be identified deterministically.
+const firstTenantId = ObjectId("6303b6bb84305d2266d0b779");
+const secondTenantId = ObjectId("7303b6bb84305d2266d0b779");
-// Performs writes on the specified collection.
-function performWrites(coll) {
- const docIds = [1, 2, 3, 4, 5];
+// Connections to the replica set primary that are stamped with their respective tenant ids.
+const firstTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId);
+const secondTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId);
+
+// Enable the change stream state such that change collections are created for both tenants.
+replSetTest.setChangeStreamState(firstTenantConn, true);
+replSetTest.setChangeStreamState(secondTenantConn, true);
+
+// Performs writes on the specified collection 'coll' such that the corresponding oplog entries are
+// captured by the tenant's change collection.
+function performWrites(coll, docIds) {
docIds.forEach(docId => assert.commandWorked(coll.insert({_id: docId})));
docIds.forEach(
docId => assert.commandWorked(coll.update({_id: docId}, {$set: {annotate: "updated"}})));
}
-// Test the change collection entries with the oplog by performing some basic writes.
-(function testBasicWritesInChangeCollection() {
+// Retrieve the last timestamp from the oplog.
+function getLatestTimestamp() {
const oplogColl = primary.getDB("local").oplog.rs;
- const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
- assert(startOplogTimestamp != undefined);
-
- performWrites(testDb.stock);
- assert(testDb.stock.drop());
+ const oplogTimestamp = oplogColl.find().sort({ts: -1}).limit(1).next().ts;
+ assert(oplogTimestamp !== undefined);
+ return oplogTimestamp;
+}
- const endOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
- assert(endOplogTimestamp !== undefined);
+// Test that writes to two different change collections are isolated and that each change collection
+// captures only the relevant oplog entries associated with the corresponding tenant.
+(function testWritesWithMultipleTenants() {
+ jsTestLog("Testing writes on change collections with multiple tenants.");
+
+ // A helper shell function to perform write for the specified 'tenantId'.
+ function shellFn(hostAddr, collName, tenantId, performWrites) {
+ load("jstests/serverless/libs/change_collection_util.js");
+
+ const tenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId);
+
+ const docIds = Array.from({length: 300}, (_, index) => index);
+ performWrites(tenantConn.getDB("test").getCollection(collName), docIds);
+
+ assert(tenantConn.getDB("test").getCollection(collName).drop());
+ }
+
+ const startOplogTimestamp = getLatestTimestamp();
+
+ // Perform writes for the first tenant in a different shell.
+ const firstTenantShellReturn =
+ startParallelShell(funWithArgs(shellFn,
+ primary.host,
+ "testWritesWithMultipleTenants_firstTenant",
+ firstTenantId,
+ performWrites),
+ primary.port);
+
+ // Perform writes to the second tenant parallely with the first tenant.
+ const secondTenantShellReturn =
+ startParallelShell(funWithArgs(shellFn,
+ primary.host,
+ "testWritesWithMultipleTenants_secondTenant",
+ secondTenantId,
+ performWrites),
+ primary.port);
+
+ // Wait for both shells to return.
+ firstTenantShellReturn();
+ secondTenantShellReturn();
+
+ const endOplogTimestamp = getLatestTimestamp();
assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);
- // Wait for the replication to finish.
- replSetTest.awaitReplication();
+ // Verify that both change collections captured their respective tenant's oplog entries in
+ // the primary.
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId);
- // Verify that the change collection entries are the same as the oplog in the primary and the
- // secondary node.
- verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp);
- verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp);
+ // TODO SERVER-69115 Uncomment this.
+ /**
+ //Wait for the replication to finish.
+ replSetTest.awaitReplication();
+ // Verify that both change collections captured their respective tenant's oplog entries in
+ // the secondary.
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp,
+ secondTenantId);
+ */
})();
-// Test the change collection entries with the oplog by performing writes in a transaction.
-(function testWritesinChangeCollectionWithTrasactions() {
- const oplogColl = primary.getDB("local").oplog.rs;
- const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
- assert(startOplogTimestamp != undefined);
+// Test that transactional writes to two different change collections are isolated and that each
+// change collection captures only the relevant 'applyOps' oplog entries associated with the
+// corresponding tenant.
+(function testTransactionalWritesWithMultipleTenants() {
+ jsTestLog("Testing transactional writes on change collections with multiple tenants.");
- const session = testDb.getMongo().startSession();
- const sessionDb = session.getDatabase(testDb.getName());
- session.startTransaction();
- performWrites(sessionDb.test);
- session.commitTransaction_forTesting();
+ // A helper shell function to perform transactional write for the specified 'tenantId'.
+ function shellFn(hostAddr, collName, tenantId, performWrites) {
+ load("jstests/serverless/libs/change_collection_util.js");
- const endOplogTimestamp = oplogColl.find().toArray().at(-1).ts;
- assert(endOplogTimestamp != undefined);
+ const tenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId);
+
+ const session = tenantConn.getDB("test").getMongo().startSession();
+ const sessionDb = session.getDatabase("test");
+
+ session.startTransaction();
+
+ const docIds = Array.from({length: 300}, (_, index) => index);
+ performWrites(sessionDb.getCollection(collName), docIds);
+
+ session.commitTransaction_forTesting();
+ }
+
+ const startOplogTimestamp = getLatestTimestamp();
+
+ // Perform writes within a transaction for the first tenant.
+ const firstTenantShellReturn =
+ startParallelShell(funWithArgs(shellFn,
+ primary.host,
+ "testTransactionalWritesWithMultipleTenants_firstTenant",
+ firstTenantId,
+ performWrites),
+ primary.port);
+
+ // Perform parallel writes within a transaction for the second tenant.
+ const secondTenantShellReturn =
+ startParallelShell(funWithArgs(shellFn,
+ primary.host,
+ "testTransactionalWritesWithMultipleTenants_secondTenant",
+ secondTenantId,
+ performWrites),
+ primary.port);
+
+ // Wait for shells to return.
+ firstTenantShellReturn();
+ secondTenantShellReturn();
+
+ const endOplogTimestamp = getLatestTimestamp();
assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0);
+ // Verify that both change collections captured their respective tenant's 'applyOps' oplog
+ // entries in the primary.
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
+ verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId);
+
+ // TODO SERVER-69115 Uncomment this.
+ /**
// Wait for the replication to finish.
replSetTest.awaitReplication();
-
- // Verify that the change collection entries are the same as the oplog in the primary and the
- // secondary node for the applyOps.
- verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp);
- verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp);
+ // Verify that both change collections captured their respective tenant's 'applyOps' oplog
+ // entries in the secondary.
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId);
+ verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp,
+ secondTenantId);
+ */
})();
replSetTest.stopSet();
diff --git a/jstests/serverless/change_collection_expired_document_remover.js b/jstests/serverless/change_collection_expired_document_remover.js
index 9c76d84c7bc..09e6b0a912d 100644
--- a/jstests/serverless/change_collection_expired_document_remover.js
+++ b/jstests/serverless/change_collection_expired_document_remover.js
@@ -1,52 +1,47 @@
/**
* Tests the change collection periodic remover job.
*
- * @tags: [requires_fcv_61]
+ * @tags: [requires_fcv_62]
*/
(function() {
"use strict";
-load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
-load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection.
+// For configureFailPoint.
+load("jstests/libs/fail_point_util.js");
+// For assertDropAndRecreateCollection.
+load("jstests/libs/collection_drop_recreate.js");
+// For ChangeStreamMultitenantReplicaSetTest.
+load("jstests/serverless/libs/change_collection_util.js");
-const kExpiredChangeRemovalJobSleepSeconds = 5;
+const getTenantConnection = ChangeStreamMultitenantReplicaSetTest.getTenantConnection;
+
+// Sleep interval in seconds for the change collection remover job.
+const kExpiredRemovalJobSleepSeconds = 5;
+// Number of seconds after which the documents in change collections will be expired.
const kExpireAfterSeconds = 1;
+// Number of seconds to sleep before inserting the next batch of documents in collections.
const kSleepBetweenWritesSeconds = 5;
+// Millisecond(s) that can be added to the wall time to advance it marginally.
const kSafetyMarginMillis = 1;
-const rst = new ReplSetTest({nodes: 2});
-
-// TODO SERVER-67267: Add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
-// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
-rst.startSet({
- setParameter: {
- "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}),
- changeCollectionRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds
- }
+// TODO SERVER-69115 Change to a 2-node replica set.
+const replSet = new ChangeStreamMultitenantReplicaSetTest({
+ nodes: 1,
+ setParameter:
+ {changeCollectionExpiredDocumentsRemoverJobSleepSeconds: kExpiredRemovalJobSleepSeconds}
});
-rst.initiate();
-
-const primary = rst.getPrimary();
-const secondary = rst.getSecondary();
-const testDb = primary.getDB(jsTestName());
-
-// Enable change streams to ensure the creation of change collections.
-assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
-// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'.
-assert.commandWorked(primary.getDB("admin").runCommand(
- {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}}));
+const primary = replSet.getPrimary();
-// TODO SERVER-65950 Extend the test case to account for multi-tenancy.
-const primaryChangeCollection = primary.getDB("config").system.change_collection;
-const secondaryChangeCollection = secondary.getDB("config").system.change_collection;
+// TODO SERVER-69115 Uncomment this code.
+// const secondary = replSet.getSecondary();
// Assert that the change collection contains all documents in 'expectedRetainedDocs' and no
-// document in 'expectedDeletedDocs' for the collection 'testColl'.
+// document in 'expectedDeletedDocs' for the collection 'stocksColl'.
function assertChangeCollectionDocuments(
- changeColl, testColl, expectedDeletedDocs, expectedRetainedDocs) {
- const collNss = `${testDb.getName()}.${testColl.getName()}`;
+ changeColl, stocksColl, expectedDeletedDocs, expectedRetainedDocs) {
+ const collNss = `${stocksTestDb.getName()}.${stocksColl.getName()}`;
const pipeline = (collectionEntries) => [{$match: {op: "i", ns: collNss}},
{$replaceRoot: {"newRoot": "$o"}},
{$match: {$or: collectionEntries}}];
@@ -72,80 +67,180 @@ function getDocumentOperationTime(doc) {
return oplogEntry.wall.getTime();
}
-(function testOnlyExpiredDocumentsDeleted() {
- assertDropAndRecreateCollection(testDb, "stocks");
- const testColl = testDb.stocks;
-
- // Wait until the remover job hangs.
- let fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges");
- fpHangBeforeRemovingDocs.wait();
-
- // Insert 5 documents.
- const expiredDocuments = [
- {_id: "aapl", price: 140},
- {_id: "dis", price: 100},
- {_id: "nflx", price: 185},
- {_id: "baba", price: 66},
- {_id: "amc", price: 185}
- ];
-
- assert.commandWorked(testColl.insertMany(expiredDocuments));
- assertChangeCollectionDocuments(primaryChangeCollection,
- testColl,
- /* expectedDeletedDocs */[],
- /* expectedRetainedDocs */ expiredDocuments);
- const lastExpiredDocumentTime = getDocumentOperationTime(expiredDocuments.at(-1));
-
- // Sleep for 'kSleepBetweenWritesSeconds' duration such that the next batch of inserts
- // has a sufficient delay in their wall time relative to the previous batch.
- sleep(kSleepBetweenWritesSeconds * 1000);
-
- // Insert 5 more documents.
- const nonExpiredDocuments = [
- {_id: "wmt", price: 11},
- {_id: "coin", price: 23},
- {_id: "ddog", price: 15},
- {_id: "goog", price: 199},
- {_id: "tsla", price: 12}
- ];
-
- assert.commandWorked(testColl.insertMany(nonExpiredDocuments));
- assertChangeCollectionDocuments(primaryChangeCollection,
- testColl,
- /* expectedDeletedDocs */[],
- /* expectedRetainedDocs */ nonExpiredDocuments);
-
- // Calculate the 'currentWallTime' such that only the first batch of inserted documents
- // should be expired, ie.: 'lastExpiredDocumentTime' + 'kExpireAfterSeconds' <
- // 'currentWallTime' < 'firstNonExpiredDocument'
- const currentWallTime =
- new Date(lastExpiredDocumentTime + kExpireAfterSeconds * 1000 + kSafetyMarginMillis);
- const fpInjectWallTime = configureFailPoint(
- primary, "injectCurrentWallTimeForRemovingExpiredDocuments", {currentWallTime});
-
- // Unblock the change collection remover job such that it picks up on the injected
- // 'currentWallTime'.
- fpHangBeforeRemovingDocs.off();
-
- // Wait until the remover job has retrieved the injected 'currentWallTime' and reset the first
- // failpoint.
- fpInjectWallTime.wait();
-
- // Wait for a complete cycle of the TTL job.
- fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges");
- fpHangBeforeRemovingDocs.wait();
-
- // Assert that the first 5 documents got deleted, but the later 5 documents did not.
- assertChangeCollectionDocuments(
- primaryChangeCollection, testColl, expiredDocuments, nonExpiredDocuments);
-
- // Wait for the replication to complete and assert that the expired documents also have been
- // deleted from the secondary.
- rst.awaitReplication();
- assertChangeCollectionDocuments(
- secondaryChangeCollection, testColl, expiredDocuments, nonExpiredDocuments);
- fpHangBeforeRemovingDocs.off();
-})();
+// Hard code a tenant ids such that tenants can be identified deterministically.
+const stocksTenantId = ObjectId("6303b6bb84305d2266d0b779");
+const citiesTenantId = ObjectId("7303b6bb84305d2266d0b779");
+const notUsedTenantId = ObjectId("8303b6bb84305d2266d0b779");
+
+// Create connections to the primary such that they have respective tenant ids stamped.
+const stocksTenantConnPrimary = getTenantConnection(primary.host, stocksTenantId);
+const citiesTenantConnPrimary = getTenantConnection(primary.host, citiesTenantId);
-rst.stopSet();
+// Create a tenant connection associated with 'notUsedTenantId' such that only the tenant id exists
+// in the replica set but no corresponding change collection exists. The purging job should safely
+// ignore this tenant without any side-effects.
+const notUsedTenantConnPrimary = getTenantConnection(primary.host, notUsedTenantId);
+
+// TODO SERVER-69115 Uncomment this code and use tenants connections to the secondary.
+/**
+const stocksTenantConnSecondary = getTenantConnection(secondary.host, stocksTenantId);
+const citiesTenantConnSecondary = getTenantConnection(secondary.host, citiesTenantId);
+*/
+
+// TODO SERVER-69115 Uncomment this code and fetch tenants change collection on the secondary.
+/**
+const stocksChangeCollectionSecondary =
+stocksTenantConnSecondary.getDB("config").system.change_collection; const
+citiesChangeCollectionSecondary =
+citiesTenantConnSecondary.getDB("config").system.change_collection;
+*/
+
+// Enable change streams for both tenants.
+replSet.setChangeStreamState(stocksTenantConnPrimary, true);
+replSet.setChangeStreamState(citiesTenantConnPrimary, true);
+
+// Verify change streams state for all tenants.
+assert.eq(replSet.getChangeStreamState(stocksTenantConnPrimary), true);
+assert.eq(replSet.getChangeStreamState(citiesTenantConnPrimary), true);
+assert.eq(replSet.getChangeStreamState(notUsedTenantConnPrimary), false);
+
+// Get tenants respective change collections.
+const stocksChangeCollectionPrimary =
+ stocksTenantConnPrimary.getDB("config").system.change_collection;
+const citiesChangeCollectionPrimary =
+ citiesTenantConnPrimary.getDB("config").system.change_collection;
+
+// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'.
+// TODO SERVER-69511 Use tenants connections instead of 'primary' to set 'expireAfterSeconds'.
+assert.commandWorked(primary.getDB("admin").runCommand(
+ {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}}));
+
+// Get tenants respective collections for testing.
+const stocksTestDb = stocksTenantConnPrimary.getDB(jsTestName());
+const citiesTestDb = citiesTenantConnPrimary.getDB(jsTestName());
+const notUsedTestDb = notUsedTenantConnPrimary.getDB(jsTestName());
+
+const stocksColl = assertDropAndRecreateCollection(stocksTestDb, "stocks");
+const citiesColl = assertDropAndRecreateCollection(citiesTestDb, "cities");
+const notUsedColl = assertDropAndRecreateCollection(notUsedTestDb, "notUsed");
+
+// Wait until the remover job hangs.
+let fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges");
+fpHangBeforeRemovingDocs.wait();
+
+// Insert 5 documents to the 'stocks' collection owned by the 'stocksTenantId' that should be
+// deleted.
+const stocksExpiredDocuments = [
+ {_id: "aapl", price: 140},
+ {_id: "dis", price: 100},
+ {_id: "nflx", price: 185},
+ {_id: "baba", price: 66},
+ {_id: "amc", price: 185}
+];
+
+// Insert 4 documents to the 'cities' collection owned by the 'citiesTenantId' that should be
+// deleted.
+const citiesExpiredDocuments = [
+ {_id: "toronto", area_km2: 630},
+ {_id: "singapore ", area_km2: 728},
+ {_id: "london", area_km2: 1572},
+ {_id: "tokyo", area_km2: 2194}
+];
+
+assert.commandWorked(stocksColl.insertMany(stocksExpiredDocuments));
+assertChangeCollectionDocuments(stocksChangeCollectionPrimary,
+ stocksColl,
+ /* expectedDeletedDocs */[],
+ /* expectedRetainedDocs */ stocksExpiredDocuments);
+
+assert.commandWorked(citiesColl.insertMany(citiesExpiredDocuments));
+assertChangeCollectionDocuments(citiesChangeCollectionPrimary,
+ citiesColl,
+ /* expectedDeletedDocs */[],
+ /* expectedRetainedDocs */ citiesExpiredDocuments);
+
+// Insert 2 documents to the 'notUsed' collection such that the associated tenant becomes visible to
+// the mongoD. The documents in these collections will not be consumed by the change stream.
+const notUsedDocuments =
+ [{_id: "cricket_bat", since_years: 2}, {_id: "tennis_racket", since_years: 2}];
+assert.commandWorked(notUsedColl.insertMany(notUsedDocuments));
+
+// All document before and inclusive this wall time will be deleted by the purging job.
+const lastExpiredDocumentTime = getDocumentOperationTime(citiesExpiredDocuments.at(-1));
+
+// Sleep for 'kSleepBetweenWritesSeconds' duration such that the next batch of inserts
+// has a sufficient delay in their wall time relative to the previous batch.
+sleep(kSleepBetweenWritesSeconds * 1000);
+
+// Insert 5 documents to the 'stocks' collection owned by the 'stocksTenantId' that should not be
+// deleted.
+const stocksNonExpiredDocuments = [
+ {_id: "wmt", price: 11},
+ {_id: "coin", price: 23},
+ {_id: "ddog", price: 15},
+ {_id: "goog", price: 199},
+ {_id: "tsla", price: 12}
+];
+
+// Insert 4 documents to the 'cities' collection owned by the 'citiesTenantId' that should not be
+// deleted.
+const citiesNonExpiredDocuments = [
+ {_id: "dublin", area_km2: 117},
+ {_id: "new york", area_km2: 783},
+ {_id: "hong kong", area_km2: 1114},
+ {_id: "sydney", area_km2: 12386}
+];
+
+assert.commandWorked(stocksColl.insertMany(stocksNonExpiredDocuments));
+assertChangeCollectionDocuments(stocksChangeCollectionPrimary,
+ stocksColl,
+ /* expectedDeletedDocs */[],
+ /* expectedRetainedDocs */ stocksNonExpiredDocuments);
+
+assert.commandWorked(citiesColl.insertMany(citiesNonExpiredDocuments));
+assertChangeCollectionDocuments(citiesChangeCollectionPrimary,
+ citiesColl,
+ /* expectedDeletedDocs */[],
+ /* expectedRetainedDocs */ citiesNonExpiredDocuments);
+
+// Calculate the 'currentWallTime' such that only the first batch of inserted documents
+// should be expired, ie.: 'lastExpiredDocumentTime' + 'kExpireAfterSeconds' <
+// 'currentWallTime' < first-non-expired-document.
+const currentWallTime =
+ new Date(lastExpiredDocumentTime + kExpireAfterSeconds * 1000 + kSafetyMarginMillis);
+const fpInjectWallTime = configureFailPoint(
+ primary, "injectCurrentWallTimeForRemovingExpiredDocuments", {currentWallTime});
+
+// Unblock the change collection remover job such that it picks up on the injected
+// 'currentWallTime'.
+fpHangBeforeRemovingDocs.off();
+
+// Wait until the remover job has retrieved the injected 'currentWallTime' and reset the first
+// failpoint.
+fpInjectWallTime.wait();
+
+// Wait for a complete cycle of the TTL job.
+fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges");
+fpHangBeforeRemovingDocs.wait();
+
+// Assert that only required documents are retained in change collections.
+assertChangeCollectionDocuments(
+ stocksChangeCollectionPrimary, stocksColl, stocksExpiredDocuments, stocksNonExpiredDocuments);
+assertChangeCollectionDocuments(
+ citiesChangeCollectionPrimary, citiesColl, citiesExpiredDocuments, citiesNonExpiredDocuments);
+
+// TODO SERVER-69115 Uncomment this code block.
+/**
+// Wait for the replication to complete and assert that the expired documents also have been
+// deleted from the secondary.
+replSet.awaitReplication();
+assertChangeCollectionDocuments(stocksChangeCollectionSecondary,
+stocksColl, stocksExpiredDocuments,stocksNonExpiredDocuments);
+assertChangeCollectionDocuments(citiesChangeCollectionSecondary,
+citiesColl, citiesExpiredDocuments, citiesNonExpiredDocuments);
+*/
+
+fpHangBeforeRemovingDocs.off();
+
+replSet.stopSet();
})();
diff --git a/jstests/serverless/change_collection_server_stats.js b/jstests/serverless/change_collection_server_stats.js
index a5992259d78..12f8803847c 100644
--- a/jstests/serverless/change_collection_server_stats.js
+++ b/jstests/serverless/change_collection_server_stats.js
@@ -5,34 +5,44 @@
(function() {
'use strict';
-load("jstests/libs/fail_point_util.js");
+// For verifyGetDiagnosticData.
load('jstests/libs/ftdc.js');
+// For ChangeStreamMultitenantReplicaSetTest.
+load("jstests/serverless/libs/change_collection_util.js");
-const kExpiredChangeRemovalJobSleepSeconds = 5;
+const kExpiredChangeRemovalJobSleepSeconds = 1;
const kExpireAfterSeconds = 1;
-const rst = new ReplSetTest({nodes: 1});
-rst.startSet({
- setParameter: {
- featureFlagServerlessChangeStreams: 1,
- changeCollectionRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds,
- }
+const replicaSet = new ChangeStreamMultitenantReplicaSetTest({
+ nodes: 1,
+ changeCollectionExpiredDocumentsRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds
});
-rst.initiate();
-const primary = rst.getPrimary();
+const primary = replicaSet.getPrimary();
const adminDb = primary.getDB('admin');
-const testDb = primary.getDB(jsTestName());
-const changeCollection = primary.getDB("config").system.change_collection;
+
+// Hard code the tenant id such that the tenant can be identified deterministically.
+const tenantId = ObjectId("6303b6bb84305d2266d0b779");
+
+// Connection to the replica set primary that are stamped with their respective tenant ids.
+const tenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, tenantId);
+
+const testDb = tenantConn.getDB(jsTestName());
// Enable change streams to ensure the creation of change collections if run in serverless mode.
-assert.commandWorked(adminDb.runCommand({setChangeStreamState: 1, enabled: true}));
+assert.commandWorked(
+ tenantConn.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
+
+const changeCollection = tenantConn.getDB("config").system.change_collection;
+
assert.soon(() => {
// Ensure that server status diagnostics is collecting change collection statistics.
const serverStatusDiagnostics = verifyGetDiagnosticData(adminDb).serverStatus;
return serverStatusDiagnostics.hasOwnProperty('changeCollections') &&
serverStatusDiagnostics.changeCollections.hasOwnProperty('purgingJob');
});
+
const diagnosticsBeforeTestCollInsertions =
verifyGetDiagnosticData(adminDb).serverStatus.changeCollections.purgingJob;
@@ -53,6 +63,7 @@ const estimatedToBeRemovedDocsSize = changeCollection.find()
assert.gt(estimatedToBeRemovedDocsSize, 0);
// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'.
+// TODO SERVER-69511 Use 'tenantConn' instead of 'primary' to set the 'expireAfterSeconds'.
assert.commandWorked(adminDb.runCommand(
{setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}}));
@@ -71,7 +82,7 @@ assert.soon(() => {
diagnosticsBeforeTestCollInsertions.totalPass &&
diagnosticsAfterTestCollInsertions.scannedCollections >
diagnosticsBeforeTestCollInsertions.scannedCollections &&
- diagnosticsAfterTestCollInsertions.bytesDeleted >
+ diagnosticsAfterTestCollInsertions.bytesDeleted >=
diagnosticsBeforeTestCollInsertions.bytesDeleted + estimatedToBeRemovedDocsSize &&
diagnosticsAfterTestCollInsertions.docsDeleted >
diagnosticsBeforeTestCollInsertions.docsDeleted + numberOfDocuments - 1 &&
@@ -81,5 +92,5 @@ assert.soon(() => {
diagnosticsBeforeTestCollInsertions.timeElapsedMillis;
});
-rst.stopSet();
+replicaSet.stopSet();
}());
diff --git a/jstests/serverless/change_stream_state_commands.js b/jstests/serverless/change_stream_state_commands.js
index fe5d38a2a67..373bad342b5 100644
--- a/jstests/serverless/change_stream_state_commands.js
+++ b/jstests/serverless/change_stream_state_commands.js
@@ -1,24 +1,25 @@
// Test that the 'setChangeStreamState' and 'getChangeStreamState' commands work as expected in the
// multi-tenant replica sets environment for various cases.
// @tags: [
-// featureFlagMongoStore,
-// requires_fcv_61,
+// requires_fcv_62,
+// __TEMPORARILY_DISABLED__
// ]
(function() {
"use strict";
-load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
-load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries.
-load('jstests/libs/parallel_shell_helpers.js'); // For funWithArgs.
+load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
+load('jstests/libs/parallel_shell_helpers.js'); // For funWithArgs.
const replSetTest = new ReplSetTest({nodes: 2});
-// TODO SERVER-67267 Add 'featureFlagServerlessChangeStreams' and 'serverless' flags and remove
-// 'failpoint.forceEnableChangeCollectionsMode'.
+// TODO SERVER-67267 Add 'serverless' flag.
+// TODO SERVER-68947 Add 'featureFlagRequireTenantID' flag.
+// TODO SERVER-69115 Remove '__TEMPORARILY_DISABLED__'
replSetTest.startSet({
setParameter: {
- "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}),
- multitenancySupport: true
+ featureFlagServerlessChangeStreams: true,
+ multitenancySupport: true,
+ featureFlagMongoStore: true
}
});
diff --git a/jstests/serverless/change_streams/basic_read_from_change_collection.js b/jstests/serverless/change_streams/basic_read_from_change_collection.js
index cfe3ab53b88..98679d18c31 100644
--- a/jstests/serverless/change_streams/basic_read_from_change_collection.js
+++ b/jstests/serverless/change_streams/basic_read_from_change_collection.js
@@ -1,63 +1,82 @@
// Tests that a change stream can be opened on a change collection when one exists, and that an
// exception is thrown if we attempt to open a stream while change streams are disabled.
// @tags: [
-// featureFlagMongoStore,
-// requires_fcv_61,
+// requires_fcv_62,
// assumes_against_mongod_not_mongos,
// ]
(function() {
"use strict";
-(function runInReplicaSet() {
- const replSetTest = new ReplSetTest({nodes: 1});
-
- // TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
- // 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
- replSetTest.startSet(
- {setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})}});
-
- replSetTest.initiate();
-
- const connection = replSetTest.getPrimary();
-
- // Enable change stream such that it creates the change collection.
- // TODO SERVER-65950 pass tenant id to the command.
- assert.commandWorked(
- connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true}));
- assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1}))
- .enabled,
- true);
-
- // Insert a document to the 'stockPrice' collection.
- const testDb = connection.getDB("test");
- const csCursor = connection.getDB("test").stockPrice.watch([]);
- testDb.stockPrice.insert({_id: "mdb", price: 250});
- testDb.stockPrice.insert({_id: "tsla", price: 650});
-
- // Verify that the change stream observes the required event.
- assert.soon(() => csCursor.hasNext());
- const event1 = csCursor.next();
- assert.eq(event1.documentKey._id, "mdb");
- assert.soon(() => csCursor.hasNext());
- const event2 = csCursor.next();
- assert.eq(event2.documentKey._id, "tsla");
-
- // Disable the change stream while the change stream cursor is still opened.
- // TODO SERVER-65950 pass tenant id to the command.
- assert.commandWorked(
- connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: false}));
- assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1}))
- .enabled,
- false);
-
- // Verify that the cursor throws 'QueryPlanKilled' exception on doing get next.
- assert.throwsWithCode(() => assert.soon(() => csCursor.hasNext()), ErrorCodes.QueryPlanKilled);
-
- // Open a new change stream cursor with change stream disabled state and verify that
- // 'ChangeStreamNotEnabled' exception is thrown.
- assert.throwsWithCode(() => testDb.stock.watch([]), ErrorCodes.ChangeStreamNotEnabled);
-
- replSetTest.stopSet();
-})();
+// For ChangeStreamMultitenantReplicaSetTest.
+load("jstests/serverless/libs/change_collection_util.js");
+// For assertDropAndRecreateCollection.
+load("jstests/libs/collection_drop_recreate.js");
+
+// TODO SERVER-69115 Change to a 2-node replica set.
+const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1});
+const primary = replSetTest.getPrimary();
+
+// Hard code tenants id such that the tenant can be identified deterministically.
+const tenantId = ObjectId("6303b6bb84305d2266d0b779");
+
+// Connection to the replica set primary that is stamped with the tenant id.
+const tenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, tenantId);
+
+// Verify that the change stream observes expected events.
+function verifyChangeEvents(csCursor, expectedEvents) {
+ for (const [expectedOpType, expectedDoc] of expectedEvents) {
+ assert.soon(() => csCursor.hasNext());
+ const event = csCursor.next();
+
+ assert.eq(event.operationType, expectedOpType, event);
+ if (event.operationType == "insert") {
+ assert.eq(event.fullDocument, expectedDoc);
+ } else if (event.operationType == "drop") {
+ assert.soon(() => csCursor.hasNext());
+ assert.eq(csCursor.isClosed(), true);
+ }
+ }
+}
+
+// Enable change stream for the first tenant.
+replSetTest.setChangeStreamState(tenantConn, true);
+
+// Open the change stream cursor.
+const testDb = tenantConn.getDB("test");
+const csCursor = testDb.stockPrice.watch([]);
+
+// Insert documents to the 'stockPrice' collection.
+const docs = [{_id: "mdb", price: 250}, {_id: "tsla", price: 650}];
+docs.forEach(doc => assert.commandWorked(testDb.stockPrice.insert(doc)));
+
+// Drop the stock price collection to invalidate the change stream cursor.
+assert(testDb.stockPrice.drop());
+
+// Verify that the change stream observes the required event.
+verifyChangeEvents(csCursor, [["insert", docs[0]], ["insert", docs[1]], ["drop", []]]);
+
+// Disable and then enable the change stream.
+replSetTest.setChangeStreamState(tenantConn, false);
+replSetTest.setChangeStreamState(tenantConn, true);
+
+// Add a new document to the 'stockPrice' collection and verify that re-enabling the change
+// stream works correctly.
+const newCsCursor = testDb.stockPrice.watch([]);
+const newDocs = [{_id: "goog", price: 2000}];
+newDocs.forEach(doc => assert.commandWorked(testDb.stockPrice.insert(doc)));
+verifyChangeEvents(newCsCursor, [["insert", newDocs[0]]]);
+
+// Disable the change stream while the change stream cursor is still opened.
+replSetTest.setChangeStreamState(tenantConn, false);
+
+// Verify that the cursor throws 'QueryPlanKilled' exception on doing get next.
+assert.throwsWithCode(() => assert.soon(() => newCsCursor.hasNext()), ErrorCodes.QueryPlanKilled);
+
+// Open a new change stream cursor with change stream disabled state and verify that
+// 'ChangeStreamNotEnabled' exception is thrown.
+assert.throwsWithCode(() => testDb.stock.watch([]), ErrorCodes.ChangeStreamNotEnabled);
+
+replSetTest.stopSet();
}());
diff --git a/jstests/serverless/change_streams/multitenant_read_from_change_collection.js b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js
new file mode 100644
index 00000000000..b10941b7999
--- /dev/null
+++ b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js
@@ -0,0 +1,158 @@
+// Tests the behaviour of change streams on change collections in an environment with more than one
+// active tenant.
+// @tags: [
+// requires_fcv_62,
+// assumes_against_mongod_not_mongos,
+// ]
+
+(function() {
+"use strict";
+
+// For ChangeStreamMultitenantReplicaSetTest.
+load("jstests/serverless/libs/change_collection_util.js");
+// For assertDropAndRecreateCollection.
+load("jstests/libs/collection_drop_recreate.js");
+
+// TODO SERVER-69115 Change to a 2-node replica set.
+const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1});
+const primary = replSetTest.getPrimary();
+
+// Hard code tenants ids such that a particular tenant can be identified deterministically.
+const firstTenantId = ObjectId("6303b6bb84305d2266d0b779");
+const secondTenantId = ObjectId("7303b6bb84305d2266d0b779");
+
+// Connections to the replica set primary that are stamped with their respective tenant ids.
+const firstTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId);
+const secondTenantConn =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId);
+
+// Verify that the change stream observes expected events. The method also collects resume tokens
+// for each expected change collection and returns those on successful assertion.
+function verifyEventsAndGetResumeTokens(csCursor, expectedEvents) {
+ let resumeTokens = [];
+
+ for (const [expectedOpType, expectedDoc] of expectedEvents) {
+ assert.soon(() => csCursor.hasNext());
+ const event = csCursor.next();
+
+ assert.eq(event.operationType, expectedOpType, event);
+ if (event.operationType == "insert") {
+ assert.eq(event.fullDocument, expectedDoc);
+ } else if (event.operationType == "drop") {
+ assert.soon(() => csCursor.hasNext());
+ assert.eq(csCursor.isClosed(), true);
+ }
+
+ resumeTokens.push(csCursor.getResumeToken());
+ }
+
+ return resumeTokens;
+}
+
+// Get the 'test' db for both tenants.
+const firstTenantTestDb = firstTenantConn.getDB("test");
+const secondTenantTestDb = secondTenantConn.getDB("test");
+
+// Recreate the 'stockPrice' collection to delete any old documents.
+assertDropAndRecreateCollection(firstTenantTestDb, "stockPrice");
+assertDropAndRecreateCollection(secondTenantTestDb, "stockPrice");
+
+// Create a new incarnation of the change collection for the first tenant.
+replSetTest.setChangeStreamState(firstTenantConn, false);
+replSetTest.setChangeStreamState(firstTenantConn, true);
+
+// These documents will be inserted in tenants 'stockPrice' collections.
+const firstTenantDocs =
+ [{_id: "mdb", price: 350}, {_id: "goog", price: 2000}, {_id: "nflx", price: 220}];
+const secondTenantDocs =
+ [{_id: "amzn", price: 3000}, {_id: "tsla", price: 750}, {_id: "aapl", price: 160}];
+
+// Open the change stream cursor for the first tenant.
+const firstTenantCsCursor = firstTenantTestDb.stockPrice.watch([]);
+
+// Fetch the latest timestamp before enabling the change stream for the second tenant.
+const startAtOperationTime =
+ primary.getDB("local").oplog.rs.find().sort({ts: -1}).limit(1).next().ts;
+assert(startAtOperationTime !== undefined);
+
+// Now create the change collection for the second tenant. The oplog timestamp associated with the
+// second tenant's create change collection will be greater than the 'startAtOperationTime'.
+replSetTest.setChangeStreamState(secondTenantConn, false);
+replSetTest.setChangeStreamState(secondTenantConn, true);
+
+// Open the change stream cursor for the second tenant.
+const secondTenantCsCursor = secondTenantTestDb.stockPrice.watch([]);
+
+// Insert documents to both change collections in jumbled fashion.
+assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[0]));
+assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[0]));
+assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[1]));
+assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[1]));
+assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[2]));
+assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[2]));
+
+// Verify that each change stream emits only the required tenant's change events and that there
+// is no leak of events amongst the change streams. Do not consume all events for the first
+// tenant as it will be consumed later.
+const firstTenantResumeTokens = verifyEventsAndGetResumeTokens(
+ firstTenantCsCursor, [["insert", firstTenantDocs[0]], ["insert", firstTenantDocs[1]]]);
+const secondTenantResumeTokens = verifyEventsAndGetResumeTokens(secondTenantCsCursor, [
+ ["insert", secondTenantDocs[0]],
+ ["insert", secondTenantDocs[1]],
+ ["insert", secondTenantDocs[2]]
+]);
+
+// Verify that change streams from both tenants can be resumed using their respective resume token.
+verifyEventsAndGetResumeTokens(
+ firstTenantTestDb.stockPrice.watch([], {resumeAfter: firstTenantResumeTokens[0]}),
+ [["insert", firstTenantDocs[1]], ["insert", firstTenantDocs[2]]]);
+verifyEventsAndGetResumeTokens(
+ secondTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}),
+ [["insert", secondTenantDocs[1]], ["insert", secondTenantDocs[2]]]);
+
+// Verify that resume tokens cannot be exchanged between tenants change streams.
+assert.throwsWithCode(
+ () => secondTenantTestDb.stockPrice.watch([], {resumeAfter: firstTenantResumeTokens[0]}),
+ ErrorCodes.ChangeStreamFatalError);
+assert.throwsWithCode(
+ () => firstTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}),
+ ErrorCodes.ChangeStreamFatalError);
+
+// Verify that the first tenant's change stream can be resumed using the timestamp
+// 'startAtOperationTime'.
+verifyEventsAndGetResumeTokens(
+ firstTenantTestDb.stockPrice.watch([], {startAtOperationTime: startAtOperationTime}), [
+ ["insert", firstTenantDocs[0]],
+ ["insert", firstTenantDocs[1]],
+ ["insert", firstTenantDocs[2]]
+ ]);
+
+// Verify that the second tenant's change stream cannot be resumed with the timestamp
+// 'startAtOperationTime' and should throw change stream history lost.
+assert.throwsWithCode(
+ () => secondTenantTestDb.stockPrice.watch([], {startAtOperationTime: startAtOperationTime}),
+ ErrorCodes.ChangeStreamHistoryLost);
+
+// Ensure that disabling the change stream for the second tenant does not impact the change
+// stream of the first tenant.
+replSetTest.setChangeStreamState(secondTenantConn, false);
+
+// The next on the change stream for the second tenant should now throw exception.
+assert.throwsWithCode(() => assert.soon(() => secondTenantCsCursor.hasNext()),
+ ErrorCodes.QueryPlanKilled);
+
+// The next of the change stream for the first tenant should continue to work. Since we have
+// still not consumed all event from the first tenant, the change stream should emit the
+// remaining ones.
+verifyEventsAndGetResumeTokens(firstTenantCsCursor, [["insert", firstTenantDocs[2]]]);
+
+// Re-enable the change stream for the second tenant and verify that the change stream cannot be
+// resumed using the resume token of previous incarnation of the change stream.
+replSetTest.setChangeStreamState(secondTenantConn, true);
+assert.throwsWithCode(
+ () => secondTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}),
+ ErrorCodes.ChangeStreamHistoryLost);
+
+replSetTest.stopSet();
+}());
diff --git a/jstests/serverless/initial_sync_change_collection.js b/jstests/serverless/initial_sync_change_collection.js
index 6f3b0128ef6..3d111307dd9 100644
--- a/jstests/serverless/initial_sync_change_collection.js
+++ b/jstests/serverless/initial_sync_change_collection.js
@@ -2,9 +2,8 @@
// and when the initial sync has completed the change collection and oplog entries are exactly same
// in the new secondary.
// @tags: [
-// featureFlagServerlessChangeStreams,
-// featureFlagMongoStore,
-// requires_fcv_61,
+// requires_fcv_62,
+// __TEMPORARILY_DISABLED__
// ]
//
(function() {
@@ -15,10 +14,16 @@ load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeC
const replSetTest = new ReplSetTest({nodes: 1});
-// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
-// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
-replSetTest.startSet(
- {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})});
+// TODO SERVER-67267 Add 'serverless' flag.
+// TODO SERVER-69115 Add 'featureFlagRequireTenantID' flag and remove '__TEMPORARILY_DISABLED__'
+// tag and replace 'ReplSetTest' with 'ChangeStreamMultitenantReplicaSetTest'.
+replSetTest.startSet({
+ setParameter: {
+ featureFlagServerlessChangeStreams: true,
+ multitenancySupport: true,
+ featureFlagMongoStore: true
+ }
+});
replSetTest.initiate();
@@ -44,7 +49,9 @@ const secondary = replSetTest.add({
setParameter: {
// Hang after the data cloning phase is completed.
"failpoint.initialSyncHangAfterDataCloning": tojson({mode: "alwaysOn"}),
- "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})
+ featureFlagServerlessChangeStreams: true,
+ multitenancySupport: true,
+ featureFlagMongoStore: true
}
});
diff --git a/jstests/serverless/libs/change_collection_util.js b/jstests/serverless/libs/change_collection_util.js
index 4026ea84f81..f9b8a9c6846 100644
--- a/jstests/serverless/libs/change_collection_util.js
+++ b/jstests/serverless/libs/change_collection_util.js
@@ -1,29 +1,44 @@
// Contains functions for testing the change collections.
-// Verifies that the oplog and change collection entries are the same for the specified start and
-// end duration of the oplog timestamp.
-function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) {
+// Verifies that the oplog and change collection entries are the same for the provided tenant
+// 'tenantId' for the specified timestamp window:- (startOplogTimestamp, endOplogTimestamp].
+function verifyChangeCollectionEntries(
+ connection, startOplogTimestamp, endOplogTimestamp, tenantId) {
+ // Fetch the oplog documents for the provided tenant for the specified timestamp window. Note
+ // that the startOplogTimestamp is expected to be just before the first write, while the
+ // endOplogTimestamp is expected to be the timestamp of the final write in the test.
const oplogColl = connection.getDB("local").oplog.rs;
- const changeColl = connection.getDB("config").system.change_collection;
+ const oplogEntries = oplogColl
+ .find({
+ $and: [
+ {ts: {$gt: startOplogTimestamp}},
+ {ts: {$lte: endOplogTimestamp}},
+ {tid: tenantId}
+ ]
+ })
+ .toArray();
- // Fetch all oplog and change collection entries for the duration: [startOplogTimestamp,
- // endOplogTimestamp].
- const oplogEntries =
- oplogColl.find({$and: [{ts: {$gte: startOplogTimestamp}}, {ts: {$lte: endOplogTimestamp}}]})
- .toArray();
+ // Fetch all documents from the tenant's change collection for the specified timestamp window.
+ const changeColl =
+ ChangeStreamMultitenantReplicaSetTest.getTenantConnection(connection.host, tenantId)
+ .getDB("config")
+ .system.change_collection;
const changeCollectionEntries =
changeColl
- .find({$and: [{_id: {$gte: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]})
+ .find({$and: [{_id: {$gt: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]})
.toArray();
- assert.eq(
- oplogEntries.length,
- changeCollectionEntries.length,
- "Number of entries in the oplog and the change collection is not the same. Oplog has total " +
- oplogEntries.length + " entries , change collection has total " +
- changeCollectionEntries.length + " entries" +
- "change collection entries " + tojson(changeCollectionEntries));
+ // Verify that the number of documents returned by the oplog and the tenant's change collection
+ // are exactly the same.
+ assert.eq(oplogEntries.length,
+ changeCollectionEntries.length,
+ "Number of entries in the oplog and the change collection with tenantId: " +
+ tenantId + " is not the same. Oplog has total " + oplogEntries.length +
+ " entries , change collection has total " + changeCollectionEntries.length +
+ " entries, change collection entries " + tojson(changeCollectionEntries));
+ // Verify that the documents in the change collection are exactly the same as the oplog for a
+ // particular tenant.
for (let idx = 0; idx < oplogEntries.length; idx++) {
const oplogEntry = oplogEntries[idx];
const changeCollectionEntry = changeCollectionEntries[idx];
@@ -32,16 +47,82 @@ function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplog
assert(changeCollectionEntry.hasOwnProperty("_id"));
assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts),
0,
- "Change collection '_id' field: " + tojson(changeCollectionEntry._id) +
+ "Change collection with tenantId: " + tenantId +
+ " '_id' field: " + tojson(changeCollectionEntry._id) +
" is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts));
delete changeCollectionEntry["_id"];
// Verify that the oplog and change collecton entry (after removing the '_id') field are
// the same.
- assert.eq(
- oplogEntry,
- changeCollectionEntry,
- "Oplog and change collection entries are not same. Oplog entry: " + tojson(oplogEntry) +
- ", change collection entry: " + tojson(changeCollectionEntry));
+ assert.eq(oplogEntry,
+ changeCollectionEntry,
+ "Oplog and change collection with tenantId: " + tenantId +
+ " entries are not same. Oplog entry: " + tojson(oplogEntry) +
+ ", change collection entry: " + tojson(changeCollectionEntry));
+ }
+}
+
+// A class that sets up the multitenant environment to enable change collections on the replica set.
+// This class also provides helpers that are commonly used when working with change collections.
+class ChangeStreamMultitenantReplicaSetTest extends ReplSetTest {
+ constructor(config) {
+ // Instantiate the 'ReplSetTest'.
+ super(config);
+
+ // Start and initialize the replica set.
+ // TODO SERVER-67267 Add 'serverless' flag.
+ const setParameter = Object.assign({}, config.setParameter || {}, {
+ featureFlagServerlessChangeStreams: true,
+ multitenancySupport: true,
+ featureFlagMongoStore: true,
+ featureFlagRequireTenantID: true
+ });
+ this.startSet({setParameter: setParameter});
+ this.initiate();
+
+ // Create a root user within the multitenant environment to enable passing '$tenant' to
+ // commands.
+ assert.commandWorked(this.getPrimary().getDB("admin").runCommand(
+ {createUser: "root", pwd: "pwd", roles: ["root"]}));
+ }
+
+ // Returns a connection to the 'hostAddr' with 'tenantId' stamped to it for the created user.
+ static getTenantConnection(hostAddr, tenantId, createUser = {
+ user: ObjectId().str,
+ roles: [{role: 'readWriteAnyDatabase', db: 'admin'}]
+ }) {
+ const tokenConn = new Mongo(hostAddr);
+
+ // Login to the root user with 'ActionType::useTenant' such that the '$tenant' can be
+ // used.
+ assert(tokenConn.getDB("admin").auth("root", "pwd"));
+
+ // Create the user with the provided attributes.
+ assert.commandWorked(tokenConn.getDB("$external").runCommand({
+ createUser: createUser.user,
+ '$tenant': tenantId,
+ roles: createUser.roles
+ }));
+
+ // Set the provided tenant id into the security token for the user.
+ tokenConn._setSecurityToken(
+ _createSecurityToken({user: createUser.user, db: '$external', tenant: tenantId}));
+
+ // Logout the root user to avoid multiple authentication.
+ tokenConn.getDB("admin").logout();
+
+ return tokenConn;
+ }
+
+ // Sets the change stream state for the provided tenant connection.
+ setChangeStreamState(tenantConn, enabled) {
+ assert.commandWorked(
+ tenantConn.getDB("admin").runCommand({setChangeStreamState: 1, enabled: enabled}));
+ }
+
+ // Returns the change stream state for the provided tenant connection.
+ getChangeStreamState(tenantConn) {
+ return assert.commandWorked(tenantConn.getDB("admin").runCommand({getChangeStreamState: 1}))
+ .enabled;
}
}
diff --git a/jstests/serverless/sharded_change_stream_state_commands.js b/jstests/serverless/sharded_change_stream_state_commands.js
index 541ce64bf5a..83102fe0e8e 100644
--- a/jstests/serverless/sharded_change_stream_state_commands.js
+++ b/jstests/serverless/sharded_change_stream_state_commands.js
@@ -9,11 +9,7 @@
const shardingTest = new ShardingTest({
shards: 2,
- other: {
- configOptions: {
- setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})}
- }
- }
+ other: {configOptions: {setParameter: {featureFlagServerlessChangeStreams: true}}}
});
// TODO SERVER-68341 Implement tests for mongoQ and ensure that the change collection is not enabled
diff --git a/jstests/serverless/write_to_change_collection_in_startup_recovery.js b/jstests/serverless/write_to_change_collection_in_startup_recovery.js
index b0950abd057..bbf7cd0a259 100644
--- a/jstests/serverless/write_to_change_collection_in_startup_recovery.js
+++ b/jstests/serverless/write_to_change_collection_in_startup_recovery.js
@@ -1,9 +1,8 @@
// Tests that replaying the oplog entries during the startup recovery also writes to the change
// collection.
// @tags: [
-// featureFlagServerlessChangeStreams,
-// multiversion_incompatible,
-// featureFlagMongoStore,
+// requires_fcv_62,
+// __TEMPORARILY_DISABLED__
// ]
(function() {
@@ -14,10 +13,16 @@ load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeC
const replSetTest = new ReplSetTest({nodes: 1});
-// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and
-// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'.
-replSetTest.startSet(
- {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})});
+// TODO SERVER-67267 Add 'serverless' flag.
+// TODO SERVER-69115 Add 'featureFlagRequireTenantID' flag and remove '__TEMPORARILY_DISABLED__'
+// tag and replace 'ReplSetTest' with 'ChangeStreamMultitenantReplicaSetTest'.
+replSetTest.startSet({
+ setParameter: {
+ featureFlagServerlessChangeStreams: true,
+ multitenancySupport: true,
+ featureFlagMongoStore: true
+ }
+});
replSetTest.initiate();
@@ -84,7 +89,11 @@ MongoRunner.stopMongod(standalone, null, {noCleanData: true, skipValidation: tru
// Start the replica set primary with the same db path.
replSetTest.start(primary, {
noCleanData: true,
- setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})
+ setParameter: {
+ featureFlagServerlessChangeStreams: true,
+ multitenancySupport: true,
+ featureFlagMongoStore: true
+ }
});
primary = replSetTest.getPrimary();
@@ -105,6 +114,7 @@ assert(endTimestamp !== undefined);
// Verify that the oplog and the change collection entries between the ['startTimestamp',
// 'endTimestamp'] window are exactly same and in the same order.
+// TODO SERVER-69115 Pass the tenant id to the 'verifyChangeCollectionEntries'.
verifyChangeCollectionEntries(primary, startTimestamp, endTimestamp);
replSetTest.stopSet();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 57c4feedb00..c0514e4dd44 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -511,7 +511,9 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/change_stream_state',
+ '$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/dbdirectclient',
'$BUILD_DIR/mongo/db/repl/primary_only_service',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
@@ -524,6 +526,21 @@ env.Library(
)
env.Library(
+ target='change_stream_serverless_helpers',
+ source=[
+ 'change_stream_serverless_helpers.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/catalog/collection',
+ '$BUILD_DIR/mongo/db/catalog/collection_catalog',
+ '$BUILD_DIR/mongo/db/query/query_knobs',
+ '$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/server_options',
+ '$BUILD_DIR/mongo/idl/feature_flag',
+ ],
+)
+
+env.Library(
target='change_stream_change_collection_manager',
source=[
'change_stream_change_collection_manager.cpp',
@@ -532,9 +549,10 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/catalog_helpers',
'$BUILD_DIR/mongo/db/catalog/clustered_collection_options',
'$BUILD_DIR/mongo/db/catalog/collection_crud',
- '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/dbhelpers',
+ '$BUILD_DIR/mongo/db/server_feature_flags',
'$BUILD_DIR/mongo/db/service_context',
],
)
@@ -546,6 +564,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/util/periodic_runner',
@@ -2515,6 +2534,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/catalog/local_oplog_info',
'$BUILD_DIR/mongo/db/change_collection_expired_change_remover',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
'$BUILD_DIR/mongo/db/commands/create_command',
'$BUILD_DIR/mongo/db/mongohasher',
diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp
index 80ff680df43..5e9089fce0c 100644
--- a/src/mongo/db/catalog/drop_collection.cpp
+++ b/src/mongo/db/catalog/drop_collection.cpp
@@ -233,7 +233,7 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx,
if (dropIfUUIDNotMatching && collectionUUID == *dropIfUUIDNotMatching) {
return Status::OK();
}
- const NamespaceStringOrUUID dbAndUUID{coll->ns().db().toString(), coll->uuid()};
+ const NamespaceStringOrUUID dbAndUUID{coll->ns().dbName(), coll->uuid()};
const int numIndexes = coll->getIndexCatalog()->numIndexesTotal(opCtx);
while (true) {
@@ -254,7 +254,7 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx,
<< collectionUUID << ") is being dropped");
// Take an exclusive lock to finish the collection drop.
- optionalAutoDb.emplace(opCtx, startingNss.db(), MODE_IX);
+ optionalAutoDb.emplace(opCtx, startingNss.dbName(), MODE_IX);
collLock.emplace(opCtx, dbAndUUID, MODE_X);
// Abandon the snapshot as the index catalog will compare the in-memory state to the
diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp
index e3ee3c411f2..8b8777e0d23 100644
--- a/src/mongo/db/change_collection_expired_change_remover_test.cpp
+++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/catalog/catalog_test_fixture.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/exec/document_value/document_value_test_util.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/plan_executor.h"
@@ -54,7 +55,8 @@ namespace mongo {
class ChangeCollectionExpiredChangeRemoverTest : public CatalogTestFixture {
protected:
ChangeCollectionExpiredChangeRemoverTest()
- : CatalogTestFixture(Options{}.useMockClock(true)), _tenantId(OID::gen()) {
+ : CatalogTestFixture(Options{}.useMockClock(true)),
+ _tenantId(change_stream_serverless_helpers::getTenantIdForTesting()) {
ChangeStreamChangeCollectionManager::create(getServiceContext());
}
@@ -67,7 +69,7 @@ protected:
}
void insertDocumentToChangeCollection(OperationContext* opCtx,
- boost::optional<TenantId> tenantId,
+ const TenantId& tenantId,
const BSONObj& obj) {
const auto wallTime = now();
Timestamp timestamp{wallTime};
@@ -78,6 +80,7 @@ protected:
oplogEntry.setNss(NamespaceString::makeChangeCollectionNSS(tenantId));
oplogEntry.setObject(obj);
oplogEntry.setWallClockTime(wallTime);
+
auto oplogEntryBson = oplogEntry.toBSON();
RecordData recordData{oplogEntryBson.objdata(), oplogEntryBson.objsize()};
@@ -112,8 +115,7 @@ protected:
return entries;
}
- void dropAndRecreateChangeCollection(OperationContext* opCtx,
- boost::optional<TenantId> tenantId) {
+ void dropAndRecreateChangeCollection(OperationContext* opCtx, const TenantId& tenantId) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.dropChangeCollection(opCtx, tenantId);
changeCollectionManager.createChangeCollection(opCtx, tenantId);
@@ -136,11 +138,13 @@ protected:
opCtx, &*changeCollection, maxRecordIdBound);
}
- const boost::optional<TenantId> _tenantId;
+ const TenantId _tenantId;
+ boost::optional<ChangeStreamChangeCollectionManager> _changeCollectionManager;
+
RAIIServerParameterControllerForTest featureFlagController{"featureFlagServerlessChangeStreams",
true};
-
- boost::optional<ChangeStreamChangeCollectionManager> _changeCollectionManager;
+ RAIIServerParameterControllerForTest queryKnobController{
+ "internalChangeStreamUseTenantIdForTesting", true};
};
// Tests that the last expired focument retrieved is the expected one.
diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp
index 80a816945be..d84fb21b672 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.cpp
+++ b/src/mongo/db/change_collection_expired_documents_remover.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/change_streams_cluster_parameter_gen.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -56,25 +57,31 @@ MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForRemovingExpiredDocuments);
namespace {
-// TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is
-// available.
-std::vector<boost::optional<TenantId>> getAllTenants() {
- return {boost::none};
+change_stream_serverless_helpers::TenantSet getConfigDbTenants(OperationContext* opCtx) {
+ auto tenantIds = change_stream_serverless_helpers::getConfigDbTenants(opCtx);
+ if (auto testTenantId = change_stream_serverless_helpers::resolveTenantId(boost::none)) {
+ tenantIds.insert(*testTenantId);
+ }
+
+ return tenantIds;
}
-boost::optional<int64_t> getExpireAfterSeconds(boost::optional<TenantId> tid) {
+boost::optional<int64_t> getExpireAfterSeconds(const TenantId& tenantId) {
auto* clusterParameters = ServerParameterSet::getClusterParameterSet();
auto* changeStreamsParam =
clusterParameters->get<ClusterParameterWithStorage<ChangeStreamsClusterParameterStorage>>(
"changeStreams");
- return changeStreamsParam->getValue(tid).getExpireAfterSeconds();
+
+ // TODO SERVER-69511 Pass 'tenantId' instead of 'boost::none'. Move this function to
+ // 'change_stream_serverless_helpers'.
+ return changeStreamsParam->getValue(boost::none).getExpireAfterSeconds();
}
void removeExpiredDocuments(Client* client) {
// TODO SERVER-66717 Remove this logic from this method. Due to the delay in the feature flag
// activation it was placed here. The remover job should ultimately be initialized at the mongod
// startup when launched in serverless mode.
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
return;
}
@@ -98,7 +105,7 @@ void removeExpiredDocuments(Client* client) {
long long maxStartWallTime = 0;
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get());
- for (const auto& tenantId : getAllTenants()) {
+ for (const auto& tenantId : getConfigDbTenants(opCtx.get())) {
auto expiredAfterSeconds = getExpireAfterSeconds(tenantId);
invariant(expiredAfterSeconds);
@@ -169,13 +176,13 @@ void removeExpiredDocuments(Client* client) {
/**
* Defines a periodic background job to remove expired documents from change collections.
- * The job will run every 'changeCollectionRemoverJobSleepSeconds', as defined in the cluster
- * parameter.
+ * The job will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds', as defined in
+ * the cluster parameter.
*/
class ChangeCollectionExpiredDocumentsRemover {
public:
ChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) {
- const auto period = Seconds{gChangeCollectionRemoverJobSleepSeconds.load()};
+ const auto period = Seconds{gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds.load()};
_jobAnchor = serviceContext->getPeriodicRunner()->makeJob(
{"ChangeCollectionExpiredDocumentsRemover", removeExpiredDocuments, period});
_jobAnchor.start();
diff --git a/src/mongo/db/change_collection_expired_documents_remover.h b/src/mongo/db/change_collection_expired_documents_remover.h
index bf9e36ae1f4..3ce5fc1ef94 100644
--- a/src/mongo/db/change_collection_expired_documents_remover.h
+++ b/src/mongo/db/change_collection_expired_documents_remover.h
@@ -35,7 +35,8 @@ namespace mongo {
/**
* Starts a periodic background job to remove expired documents from change collections. The job
- * will run every 'changeCollectionRemoverJobSleepSeconds' as defined in the cluster parameter.
+ * will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds' as defined in the cluster
+ * parameter.
*/
void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext);
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index ca2735db03c..eee89bf9e07 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -38,7 +38,7 @@
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
-#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/multitenancy_gen.h"
#include "mongo/db/namespace_string.h"
@@ -46,29 +46,16 @@
#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/logv2/log.h"
namespace mongo {
-
-// Sharded clusters do not support serverless mode at present, but this failpoint allows us to
-// nonetheless test the behaviour of change collections in a sharded environment.
-MONGO_FAIL_POINT_DEFINE(forceEnableChangeCollectionsMode);
-
namespace {
const auto getChangeCollectionManager =
ServiceContext::declareDecoration<boost::optional<ChangeStreamChangeCollectionManager>>();
/**
- * Returns the list of all tenant ids in the replica set.
- * TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is
- * available.
- */
-std::vector<boost::optional<TenantId>> getAllTenants() {
- return {boost::none};
-}
-
-/**
* Creates a Document object from the supplied oplog entry, performs necessary modifications to it
* and then returns it as a BSON object.
*/
@@ -88,13 +75,15 @@ class ChangeCollectionsWriter {
public:
explicit ChangeCollectionsWriter(const AutoGetChangeCollection::AccessMode& accessMode)
: _accessMode{accessMode} {}
+
/**
* Adds the insert statement for the provided tenant that will be written to the change
* collection when the 'write()' method is called.
*/
- void add(const TenantId& tenantId, InsertStatement insertStatement) {
- if (_shouldAddEntry(insertStatement)) {
- _tenantStatementsMap[tenantId].push_back(std::move(insertStatement));
+ void add(InsertStatement insertStatement) {
+ if (auto tenantId = _extractTenantId(insertStatement);
+ tenantId && _shouldAddEntry(insertStatement)) {
+ _tenantStatementsMap[*tenantId].push_back(std::move(insertStatement));
}
}
@@ -104,14 +93,12 @@ public:
*/
Status write(OperationContext* opCtx, OpDebug* opDebug) {
for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) {
- AutoGetChangeCollection tenantChangeCollection(
- opCtx, _accessMode, boost::none /* tenantId */);
+ AutoGetChangeCollection tenantChangeCollection(opCtx, _accessMode, tenantId);
// The change collection does not exist for a particular tenant because either the
// change collection is not enabled or is in the process of enablement. Ignore this
// insert for now.
- // TODO: SERVER-65950 move this check before inserting to the map
- // 'tenantToInsertStatements'.
+ // TODO SERVER-67170 Move this check before inserting to the map.
if (!tenantChangeCollection) {
continue;
}
@@ -127,9 +114,9 @@ public:
false /* fromMigrate */);
if (!status.isOK()) {
return Status(status.code(),
- str::stream()
- << "Write to change collection: " << tenantChangeCollection->ns()
- << "failed, reason: " << status.reason());
+ str::stream() << "Write to change collection: "
+ << tenantChangeCollection->ns().toStringWithTenantId()
+ << "failed, reason: " << status.reason());
}
}
@@ -137,12 +124,31 @@ public:
}
private:
+ boost::optional<TenantId> _extractTenantId(const InsertStatement& insertStatement) {
+ // Parse the oplog entry to fetch the tenant id from 'tid' field. The oplog entry will not
+ // written to the change collection if 'tid' field is missing.
+ auto& oplogDoc = insertStatement.doc;
+ if (auto tidFieldElem = oplogDoc.getField(repl::OplogEntry::kTidFieldName)) {
+ return TenantId{Value(tidFieldElem).getOid()};
+ }
+
+ if (MONGO_unlikely(internalChangeStreamUseTenantIdForTesting.load())) {
+ return change_stream_serverless_helpers::getTenantIdForTesting();
+ }
+
+ return boost::none;
+ }
+
bool _shouldAddEntry(const InsertStatement& insertStatement) {
auto& oplogDoc = insertStatement.doc;
- // TODO SERVER-65950 retreive tenant from the oplog.
// TODO SERVER-67170 avoid inspecting the oplog BSON object.
if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]) {
+ // Avoid writing entry with empty 'ns' field, for eg. 'periodic noop' entry.
+ if (nssFieldElem.String().empty()) {
+ return false;
+ }
+
if (nssFieldElem.String() == "config.$cmd"_sd) {
if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) {
// The oplog entry might be a drop command on the change collection. Check if
@@ -225,40 +231,8 @@ void ChangeStreamChangeCollectionManager::create(ServiceContext* service) {
getChangeCollectionManager(service).emplace(service);
}
-bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() {
- // A change collection must not be enabled on the config server.
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- return false;
- }
-
- // If the force fail point is enabled then declare the change collection mode as active.
- if (MONGO_unlikely(forceEnableChangeCollectionsMode.shouldFail())) {
- return true;
- }
-
- // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag.
- return serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
- serverGlobalParams.featureCompatibility);
-}
-
-bool ChangeStreamChangeCollectionManager::hasChangeCollection(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) const {
- auto catalog = CollectionCatalog::get(opCtx);
- return static_cast<bool>(catalog->lookupCollectionByNamespace(
- opCtx, NamespaceString::makeChangeCollectionNSS(tenantId)));
-}
-
-bool ChangeStreamChangeCollectionManager::isChangeStreamEnabled(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) const {
- // A change stream in the serverless is declared as enabled if both the change collection and
- // the pre-images collection exist for the provided tenant.
- return isChangeCollectionsModeActive() && hasChangeCollection(opCtx, tenantId) &&
- ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(opCtx, tenantId);
-}
-
-void ChangeStreamChangeCollectionManager::createChangeCollection(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+void ChangeStreamChangeCollectionManager::createChangeCollection(OperationContext* opCtx,
+ const TenantId& tenantId) {
// Make the change collection clustered by '_id'. The '_id' field will have the same value as
// the 'ts' field of the oplog.
CollectionOptions changeCollectionOptions;
@@ -268,13 +242,13 @@ void ChangeStreamChangeCollectionManager::createChangeCollection(
const auto status = createCollection(opCtx, changeCollNss, changeCollectionOptions, BSONObj());
uassert(status.code(),
- str::stream() << "Failed to create change collection: " << changeCollNss
- << causedBy(status.reason()),
+ str::stream() << "Failed to create change collection: "
+ << changeCollNss.toStringWithTenantId() << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceExists);
}
void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* opCtx,
- boost::optional<TenantId> tenantId) {
+ const TenantId& tenantId) {
DropReply dropReply;
const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId);
@@ -284,8 +258,8 @@ void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext*
&dropReply,
DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
uassert(status.code(),
- str::stream() << "Failed to drop change collection: " << changeCollNss
- << causedBy(status.reason()),
+ str::stream() << "Failed to drop change collection: "
+ << changeCollNss.toStringWithTenantId() << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceNotFound);
}
@@ -310,9 +284,7 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
// tenant.
auto changeCollDoc = createChangeCollectionEntryFromOplog(record.data.toBson());
- // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id.
changeCollectionsWriter.add(
- TenantId::kSystemTenantId,
InsertStatement{std::move(changeCollDoc), ts, repl::OpTime::kUninitializedTerm});
}
@@ -352,11 +324,8 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
auto changeCollDoc = createChangeCollectionEntryFromOplog(oplogDoc);
- // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id.
- changeCollectionsWriter.add(TenantId::kSystemTenantId,
- InsertStatement{std::move(changeCollDoc),
- oplogSlot.getTimestamp(),
- oplogSlot.getTerm()});
+ changeCollectionsWriter.add(InsertStatement{
+ std::move(changeCollDoc), oplogSlot.getTimestamp(), oplogSlot.getTerm()});
}
// Write documents to change collections.
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 49ff64d635b..82d9fc01590 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -113,34 +113,19 @@ public:
static ChangeStreamChangeCollectionManager& get(OperationContext* opCtx);
/**
- * Returns true if the server is configured such that change collections can be used to record
- * oplog entries; ie, we are running in a Serverless context. Returns false otherwise.
- */
- static bool isChangeCollectionsModeActive();
-
- /**
- * Returns true if the change collection is present for the specified tenant, false otherwise.
- */
- bool hasChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId) const;
-
- /**
* Returns true if the change stream is enabled for the provided tenant, false otherwise.
*/
bool isChangeStreamEnabled(OperationContext* opCtx, boost::optional<TenantId> tenantId) const;
/**
* Creates a change collection for the specified tenant, if it doesn't exist.
- *
- * TODO: SERVER-65950 make tenantId field mandatory.
*/
- void createChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+ void createChangeCollection(OperationContext* opCtx, const TenantId& tenantId);
/**
* Deletes the change collection for the specified tenant, if it already exist.
- *
- * TODO: SERVER-65950 make tenantId field mandatory.
*/
- void dropChangeCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
+ void dropChangeCollection(OperationContext* opCtx, const TenantId& tenantId);
/**
* Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog
@@ -152,8 +137,6 @@ public:
*
* Failure in insertion to any change collection will result in a fatal exception and will bring
* down the node.
- *
- * TODO: SERVER-65950 make tenantId field mandatory.
*/
void insertDocumentsToChangeCollection(OperationContext* opCtx,
const std::vector<Record>& oplogRecords,
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
index 3b82e7a7c46..25a12d7636a 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
@@ -35,7 +35,9 @@
#include "mongo/db/catalog/create_collection.h"
#include "mongo/db/catalog/drop_collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_options_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/concurrency/lock_manager_defs.h"
#include "mongo/db/concurrency/locker.h"
@@ -47,7 +49,6 @@
#include "mongo/logv2/log.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/idle_thread_block.h"
-#include "mongo/util/fail_point.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
@@ -118,7 +119,8 @@ void ChangeStreamPreImagesCollectionManager::createPreImagesCollection(
opCtx, preImagesCollectionNamespace, preImagesCollectionOptions, BSONObj());
uassert(status.code(),
str::stream() << "Failed to create the pre-images collection: "
- << preImagesCollectionNamespace.coll() << causedBy(status.reason()),
+ << preImagesCollectionNamespace.toStringWithTenantId()
+ << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceExists);
}
@@ -133,7 +135,8 @@ void ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(
DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops);
uassert(status.code(),
str::stream() << "Failed to drop the pre-images collection: "
- << preImagesCollectionNamespace.coll() << causedBy(status.reason()),
+ << preImagesCollectionNamespace.toStringWithTenantId()
+ << causedBy(status.reason()),
status.isOK() || status.code() == ErrorCodes::NamespaceNotFound);
}
@@ -148,11 +151,13 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op
<< preImage.getId().getApplyOpsIndex(),
preImage.getId().getApplyOpsIndex() >= 0);
+ // TODO SERVER-66642 Consider using internal test-tenant id if applicable.
+ const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId);
+
// This lock acquisition can block on a stronger lock held by another operation modifying
// the pre-images collection. There are no known cases where an operation holding an
// exclusive lock on the pre-images collection also waits for oplog visibility.
AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
- const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId);
AutoGetCollection preImagesCollectionRaii(
opCtx, preImagesCollectionNamespace, LockMode::MODE_IX);
auto& changeStreamPreImagesCollection = preImagesCollectionRaii.getCollection();
@@ -173,13 +178,6 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op
uassertStatusOK(insertionStatus);
}
-bool ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(
- OperationContext* opCtx, boost::optional<TenantId> tenantId) {
- auto catalog = CollectionCatalog::get(opCtx);
- return static_cast<bool>(catalog->lookupCollectionByNamespace(
- opCtx, NamespaceString::makePreImageCollectionNSS(tenantId)));
-}
-
namespace {
RecordId toRecordId(ChangeStreamPreImageId id) {
return record_id_helpers::keyForElem(
@@ -408,8 +406,9 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
// Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection
// doesn't exist.
+ // TODO SERVER-66642 Account for multitenancy.
AutoGetCollection autoColl(
- opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX);
+ opCtx.get(), NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX);
const auto& preImagesColl = autoColl.getCollection();
if (!preImagesColl) {
return;
@@ -436,11 +435,12 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
change_stream_pre_image_helpers::getPreImageExpirationTime(
opCtx.get(), currentTimeForTimeBasedExpiration));
+ // TODO SERVER-66642 Account for multitenancy.
for (const auto& collectionRange : expiredPreImages) {
writeConflictRetry(
opCtx.get(),
"ChangeStreamExpiredPreImagesRemover",
- NamespaceString::kChangeStreamPreImagesNamespace.ns(),
+ NamespaceString::makePreImageCollectionNSS(boost::none).ns(),
[&] {
auto params = std::make_unique<DeleteStageParams>();
params->isMulti = true;
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h
index dede0e38c96..75efb28c22d 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.h
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.h
@@ -89,13 +89,6 @@ public:
const ChangeStreamPreImage& preImage);
/**
- * Returns true if the pre-images collection exists, false otherwise. If 'tenantId' is provided
- * then the pre-images collection associated with that tenant will be checked for existence,
- * otherwise the default pre-images collection will be checked.
- */
- static bool hasPreImagesCollection(OperationContext* opCtx, boost::optional<TenantId> tenantId);
-
- /**
* Scans the system pre-images collection and deletes the expired pre-images from it.
*/
static void performExpiredChangeStreamPreImagesRemovalPass(Client* client);
diff --git a/src/mongo/db/change_stream_serverless_helpers.cpp b/src/mongo/db/change_stream_serverless_helpers.cpp
new file mode 100644
index 00000000000..0577894e397
--- /dev/null
+++ b/src/mongo/db/change_stream_serverless_helpers.cpp
@@ -0,0 +1,101 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
+
+#include "mongo/db/change_stream_serverless_helpers.h"
+
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/server_feature_flags_gen.h"
+#include "mongo/db/server_options.h"
+
+namespace mongo {
+namespace change_stream_serverless_helpers {
+
+bool isChangeCollectionsModeActive() {
+ // A change collection must not be enabled on the config server.
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return false;
+ }
+
+ // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag.
+ return serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled(
+ serverGlobalParams.featureCompatibility);
+}
+
+bool isChangeStreamEnabled(OperationContext* opCtx, const TenantId& tenantId) {
+ auto catalog = CollectionCatalog::get(opCtx);
+
+ // A change stream in the serverless is declared as enabled if both the change collection and
+ // the pre-images collection exist for the provided tenant.
+ // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection.
+ return isChangeCollectionsModeActive() &&
+ static_cast<bool>(catalog->lookupCollectionByNamespace(
+ opCtx, NamespaceString::makeChangeCollectionNSS(tenantId))) &&
+ static_cast<bool>(catalog->lookupCollectionByNamespace(
+ opCtx, NamespaceString::makePreImageCollectionNSS(boost::none)));
+}
+
+const TenantId& getTenantIdForTesting() {
+ static const TenantId kTestTenantId(
+ OID("00000000" /* timestamp */
+ "0000000000" /* process id */
+ "000000" /* counter */));
+
+ return kTestTenantId;
+}
+
+boost::optional<TenantId> resolveTenantId(boost::optional<TenantId> tenantId) {
+ if (tenantId) {
+ return tenantId;
+ } else if (MONGO_unlikely(internalChangeStreamUseTenantIdForTesting.load())) {
+ return getTenantIdForTesting();
+ }
+
+ return tenantId;
+}
+
+TenantSet getConfigDbTenants(OperationContext* opCtx) {
+ TenantSet tenantIds;
+
+ auto dbNames = CollectionCatalog::get(opCtx)->getAllDbNames();
+ for (auto&& dbName : dbNames) {
+ if (dbName.db() == NamespaceString::kConfigDb && dbName.tenantId()) {
+ tenantIds.insert(*dbName.tenantId());
+ }
+ }
+
+ return tenantIds;
+}
+
+} // namespace change_stream_serverless_helpers
+} // namespace mongo
diff --git a/src/mongo/db/change_stream_serverless_helpers.h b/src/mongo/db/change_stream_serverless_helpers.h
new file mode 100644
index 00000000000..bdeb04f3ff7
--- /dev/null
+++ b/src/mongo/db/change_stream_serverless_helpers.h
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional/optional.hpp>
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/tenant_id.h"
+
+namespace mongo {
+namespace change_stream_serverless_helpers {
+
+using TenantSet = stdx::unordered_set<TenantId, TenantId::Hasher>;
+
+/**
+ * Returns true if the server is configured such that change collections can be used to record
+ * oplog entries; ie, we are running in a Serverless context. Returns false otherwise.
+ */
+bool isChangeCollectionsModeActive();
+
+/**
+ * Returns true if the change stream is enabled for the provided tenant, false otherwise.
+ */
+bool isChangeStreamEnabled(OperationContext* opCtx, const TenantId& tenantId);
+
+/**
+ * Returns an internal tenant id that will be used for testing purposes. This tenant id will not
+ * conflict with any other tenant id.
+ */
+const TenantId& getTenantIdForTesting();
+
+/**
+ * If the provided 'tenantId' is missing and 'internalChangeStreamUseTenantIdForTesting' is true,
+ * returns a special 'TenantId' for testing purposes. Otherwise, returns the provided 'tenantId'.
+ */
+boost::optional<TenantId> resolveTenantId(boost::optional<TenantId> tenantId);
+
+/**
+ * Returns the list of the tenants associated with a 'config' database.
+ */
+TenantSet getConfigDbTenants(OperationContext* opCtx);
+
+} // namespace change_stream_serverless_helpers
+} // namespace mongo
diff --git a/src/mongo/db/change_streams_cluster_parameter.idl b/src/mongo/db/change_streams_cluster_parameter.idl
index 466e1c0345a..899018d04df 100644
--- a/src/mongo/db/change_streams_cluster_parameter.idl
+++ b/src/mongo/db/change_streams_cluster_parameter.idl
@@ -58,11 +58,11 @@ server_parameters:
cpp_varname: gChangeStreamsClusterParameter
validator:
callback: validateChangeStreamsClusterParameter
- changeCollectionRemoverJobSleepSeconds:
+ changeCollectionExpiredDocumentsRemoverJobSleepSeconds:
description: "Specifies the number of seconds for which the periodic change collection remover job will sleep between each cycle."
set_at: [ startup ]
cpp_vartype: AtomicWord<int>
- cpp_varname: "gChangeCollectionRemoverJobSleepSeconds"
+ cpp_varname: "gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds"
validator:
gte: 1
default: 10
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index c26e7dfc7c0..77573fa709c 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -354,6 +354,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/command_can_run_here',
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
diff --git a/src/mongo/db/commands/change_stream_state_command.cpp b/src/mongo/db/commands/change_stream_state_command.cpp
index de8d98083ea..44d186c03a1 100644
--- a/src/mongo/db/commands/change_stream_state_command.cpp
+++ b/src/mongo/db/commands/change_stream_state_command.cpp
@@ -30,8 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/change_stream_change_collection_manager.h"
-#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/change_stream_state_gen.h"
#include "mongo/db/commands.h"
#include "mongo/db/set_change_stream_state_coordinator.h"
@@ -40,7 +39,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
namespace mongo {
-
namespace {
/**
@@ -67,6 +65,10 @@ public:
" enabled: enable or disable the change stream";
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
@@ -74,18 +76,19 @@ public:
void typedRun(OperationContext* opCtx) {
uassert(ErrorCodes::CommandNotSupported,
str::stream() << SetChangeStreamStateCommandRequest::kCommandName
- << " is only supported in the serverless",
- ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive());
+ << " command is only supported in serverless",
+ change_stream_serverless_helpers::isChangeCollectionsModeActive());
- // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be
- // present. Remove 'getDollarTenant()' and fetch tenant from dbName().
- const std::string tenantId = request().getDollarTenant()
- ? request().getDollarTenant()->toString()
- : TenantId::kSystemTenantId.toString();
+ const auto tenantId =
+ change_stream_serverless_helpers::resolveTenantId(request().getDbName().tenantId());
+ uassert(ErrorCodes::BadValue,
+ str::stream() << SetChangeStreamStateCommandRequest::kCommandName
+ << " command must be provided with a tenant id",
+ tenantId);
// Prepare the payload for the 'SetChangeStreamStateCoordinator'.
SetChangeStreamStateCoordinatorId coordinatorId;
- coordinatorId.setTenantId({TenantId{OID(tenantId)}});
+ coordinatorId.setTenantId(tenantId);
SetChangeStreamStateCoordinatorDocument coordinatorDoc{
coordinatorId, request().getChangeStreamStateParameters().toBSON()};
@@ -134,6 +137,10 @@ public:
" {getChangeStreamState: 1}";
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
@@ -141,17 +148,20 @@ public:
auto typedRun(OperationContext* opCtx) {
uassert(ErrorCodes::CommandNotSupported,
str::stream() << GetChangeStreamStateCommandRequest::kCommandName
- << " is only supported in the serverless",
- ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive());
+ << " command is only supported in serverless",
+ change_stream_serverless_helpers::isChangeCollectionsModeActive());
+
+ const auto tenantId =
+ change_stream_serverless_helpers::resolveTenantId(request().getDbName().tenantId());
+ uassert(ErrorCodes::BadValue,
+ str::stream() << GetChangeStreamStateCommandRequest::kCommandName
+ << " command must be provided with a tenant id",
+ tenantId);
- // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be
- // present.
- boost::optional<TenantId> tenantId = boost::none;
// Set the change stream enablement state in the 'reply' object.
- auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
GetChangeStreamStateCommandRequest::Reply reply{
- changeCollectionManager.isChangeStreamEnabled(opCtx, tenantId)};
+ change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId)};
return reply;
}
diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp
index eb3c3567077..29c85819230 100644
--- a/src/mongo/db/commands/dbcommands.cpp
+++ b/src/mongo/db/commands/dbcommands.cpp
@@ -188,6 +188,10 @@ public:
bool collectsResourceConsumptionMetrics() const final {
return true;
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBaseGen {
public:
using InvocationBaseGen::InvocationBaseGen;
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 4fe8a736ad7..b7bd221ec61 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/curop.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/db_raii.h"
@@ -757,13 +758,20 @@ Status runAggregate(OperationContext* opCtx,
nss = NamespaceString::kRsOplogNamespace;
// In case of serverless the change stream will be opened on the change collection.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
- auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
+ if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
+ const auto tenantId =
+ change_stream_serverless_helpers::resolveTenantId(origNss.tenantId());
+
+ uassert(ErrorCodes::BadValue,
+ "Change streams cannot be used without tenant id",
+ tenantId);
+
uassert(ErrorCodes::ChangeStreamNotEnabled,
"Change streams must be enabled before being used.",
- changeCollectionManager.isChangeStreamEnabled(opCtx, origNss.tenantId()));
+ change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId));
+
- nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId());
+ nss = NamespaceString::makeChangeCollectionNSS(tenantId);
}
// Assert that a change stream on the config server is always opened on the oplog.
@@ -785,7 +793,7 @@ Status runAggregate(OperationContext* opCtx,
LOGV2_INFO(6689200,
"Opening change stream on the namespace: {nss}",
"Opening change stream",
- "nss"_attr = nss.toString());
+ "nss"_attr = nss.toStringWithTenantId());
}
// Upgrade and wait for read concern if necessary.
diff --git a/src/mongo/db/commands/set_cluster_parameter_command.cpp b/src/mongo/db/commands/set_cluster_parameter_command.cpp
index 08ae1b2835e..8c49717baad 100644
--- a/src/mongo/db/commands/set_cluster_parameter_command.cpp
+++ b/src/mongo/db/commands/set_cluster_parameter_command.cpp
@@ -66,6 +66,10 @@ public:
return "Set cluster parameter on replica set or node";
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBase {
public:
using InvocationBase::InvocationBase;
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index ce879e0ee2a..9240c572a54 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -87,6 +87,10 @@ public:
return true;
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBaseGen {
public:
using InvocationBaseGen::InvocationBaseGen;
@@ -200,6 +204,10 @@ public:
return true;
}
+ bool allowedWithSecurityToken() const final {
+ return true;
+ }
+
class Invocation final : public InvocationBaseGen {
public:
using InvocationBaseGen::InvocationBaseGen;
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 8cbe5909154..f1a5d2a4871 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -349,7 +349,7 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) {
}
}
- // TODO SERVER-65950 create 'SetChangeStreamStateCoordinatorService' only in the serverless.
+ // TODO SERVER-66717 create 'SetChangeStreamStateCoordinatorService' only in the serverless.
services.push_back(std::make_unique<SetChangeStreamStateCoordinatorService>(serviceContext));
for (auto& service : services) {
@@ -790,6 +790,8 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
repl::ReplicationCoordinator::modeNone;
if (!isStandalone) {
startChangeStreamExpiredPreImagesRemover(serviceContext);
+ // TODO SERVER-66717 Start 'startChangeCollectionExpiredDocumentsRemover' only in the
+ // serverless.
startChangeCollectionExpiredDocumentsRemover(serviceContext);
}
@@ -1561,7 +1563,7 @@ int mongod_main(int argc, char* argv[]) {
ReadWriteConcernDefaults::create(service, readWriteConcernDefaultsCacheLookupMongoD);
ChangeStreamOptionsManager::create(service);
- // TODO SERVER-65950 create 'ChangeStreamChangeCollectionManager' only in the serverless.
+ // TODO SERVER-66717 Create 'ChangeStreamChangeCollectionManager' only in the serverless.
ChangeStreamChangeCollectionManager::create(service);
#if defined(_WIN32)
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 5cfd942a8f3..a3a28901993 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -111,8 +111,6 @@ const NamespaceString NamespaceString::kSystemReplSetNamespace(NamespaceString::
"system.replset");
const NamespaceString NamespaceString::kLastVoteNamespace(NamespaceString::kLocalDb,
"replset.election");
-const NamespaceString NamespaceString::kChangeStreamPreImagesNamespace(NamespaceString::kConfigDb,
- "system.preimages");
const NamespaceString NamespaceString::kIndexBuildEntryNamespace(NamespaceString::kConfigDb,
"system.indexBuilds");
const NamespaceString NamespaceString::kRangeDeletionNamespace(NamespaceString::kConfigDb,
@@ -338,8 +336,7 @@ NamespaceString NamespaceString::makeCollectionlessAggregateNSS(const DatabaseNa
NamespaceString NamespaceString::makeChangeCollectionNSS(
const boost::optional<TenantId>& tenantId) {
- // TODO: SERVER-65950 create namespace for a particular tenant.
- return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName};
+ return NamespaceString{tenantId, kConfigDb, kChangeCollectionName};
}
NamespaceString NamespaceString::makeGlobalIndexNSS(const UUID& id) {
@@ -350,8 +347,7 @@ NamespaceString NamespaceString::makeGlobalIndexNSS(const UUID& id) {
NamespaceString NamespaceString::makePreImageCollectionNSS(
const boost::optional<TenantId>& tenantId) {
- return tenantId ? NamespaceString(tenantId, kConfigDb, "system.preimages")
- : kChangeStreamPreImagesNamespace;
+ return NamespaceString{tenantId, kConfigDb, kPreImagesCollectionName};
}
std::string NamespaceString::getSisterNS(StringData local) const {
@@ -469,11 +465,11 @@ bool NamespaceString::isTimeseriesBucketsCollection() const {
}
bool NamespaceString::isChangeStreamPreImagesCollection() const {
- return ns() == kChangeStreamPreImagesNamespace.ns();
+ return _dbName.db() == kConfigDb && coll() == kPreImagesCollectionName;
}
bool NamespaceString::isChangeCollection() const {
- return db() == kConfigDb && coll() == kChangeCollectionName;
+ return _dbName.db() == kConfigDb && coll() == kChangeCollectionName;
}
bool NamespaceString::isConfigImagesCollection() const {
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 79da7a0f8ae..da20bca25d2 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -82,7 +82,10 @@ public:
// Name for the system.js collection
static constexpr StringData kSystemDotJavascriptCollectionName = "system.js"_sd;
- // Name for the change stream change collection.
+ // Name of the pre-images collection.
+ static constexpr StringData kPreImagesCollectionName = "system.preimages"_sd;
+
+ // Name of the change stream change collection.
static constexpr StringData kChangeCollectionName = "system.change_collection"_sd;
// Names of privilege document collections
@@ -171,9 +174,6 @@ public:
// Namespace for storing the last replica set election vote.
static const NamespaceString kLastVoteNamespace;
- // Namespace for change stream pre-images collection.
- static const NamespaceString kChangeStreamPreImagesNamespace;
-
// Namespace for index build entries.
static const NamespaceString kIndexBuildEntryNamespace;
diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp
index 6e120317ca8..bb54d4bc07a 100644
--- a/src/mongo/db/op_observer/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp
@@ -225,7 +225,7 @@ protected:
reset(opCtx, NamespaceString::kRsOplogNamespace);
reset(opCtx, NamespaceString::kSessionTransactionsTableNamespace);
reset(opCtx, NamespaceString::kConfigImagesNamespace);
- reset(opCtx, NamespaceString::kChangeStreamPreImagesNamespace);
+ reset(opCtx, NamespaceString::makePreImageCollectionNSS(boost::none));
}
// Assert that the oplog has the expected number of entries, and return them
@@ -288,7 +288,7 @@ protected:
bool didWriteDeletedDocToPreImagesCollection(OperationContext* opCtx,
const ChangeStreamPreImageId preImageId) {
AutoGetCollection preImagesCollection(
- opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IS);
+ opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), LockMode::MODE_IS);
const auto preImage = Helpers::findOneForTesting(
opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON()), false);
return !preImage.isEmpty();
@@ -323,7 +323,7 @@ protected:
const ChangeStreamPreImageId& preImageId,
BSONObj* container) {
AutoGetCollection preImagesCollection(
- opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IS);
+ opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), LockMode::MODE_IS);
*container = Helpers::findOneForTesting(opCtx,
preImagesCollection.getCollection(),
BSON("_id" << preImageId.toBSON()))
diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
index 583e3198a42..78d7e9f19cf 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp
@@ -120,9 +120,11 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamAddPreImage::doGetNext()
boost::optional<Document> DocumentSourceChangeStreamAddPreImage::lookupPreImage(
boost::intrusive_ptr<ExpressionContext> pExpCtx, const Document& preImageId) {
// Look up the pre-image document on the local node by id.
+ // TODO SERVER-66642 Consider using internal test-tenant id if applicable.
+ const auto tenantId = pExpCtx->ns.tenantId();
auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocumentLocally(
pExpCtx,
- NamespaceString::kChangeStreamPreImagesNamespace,
+ NamespaceString::makePreImageCollectionNSS(tenantId),
Document{{ChangeStreamPreImage::kIdFieldName, preImageId}});
// Return boost::none to signify that we failed to find the pre-image.
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 9444a694c81..68d03d0c96a 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -1544,7 +1544,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
return (ns.isLocal() || ns.isConfigDotCacheDotChunks() ||
ns.isReshardingLocalOplogBufferCollection() ||
ns == NamespaceString::kConfigImagesNamespace ||
- ns == NamespaceString::kChangeStreamPreImagesNamespace);
+ ns.isChangeStreamPreImagesCollection());
};
if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed ||
diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl
index efbe81f8501..e36a60c4df3 100644
--- a/src/mongo/db/query/query_knobs.idl
+++ b/src/mongo/db/query/query_knobs.idl
@@ -906,6 +906,15 @@ server_parameters:
default:
expr: false
+ # TODO SERVER-68341 Remove this query knob after tenancy is supported in the sharded cluster.
+ internalChangeStreamUseTenantIdForTesting:
+ description: "If true, then change streams will operate upon an internal tenant id for testing
+ purposes if the actual tenant is not provided."
+ set_at: [ startup ]
+ cpp_varname: "internalChangeStreamUseTenantIdForTesting"
+ cpp_vartype: AtomicWord<bool>
+ default: false
+
# Note for adding additional query knobs:
#
# When adding a new query knob, you should consider whether or not you need to add an 'on_update'
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 7185e7e77d6..d91977bac6c 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -75,6 +75,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/multi_index_block',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers',
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
@@ -634,6 +635,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/catalog/collection_crud',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/commands/mongod_fsync',
'$BUILD_DIR/mongo/db/concurrency/exception_util',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
@@ -1502,6 +1504,7 @@ env.Library(
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/auth/auth',
'$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/cloner',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/curop',
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index b17e5f108ec..5c4ec5d5dab 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/client.h"
#include "mongo/db/coll_mod_gen.h"
#include "mongo/db/commands.h"
@@ -390,7 +391,7 @@ void _logOpsInner(OperationContext* opCtx,
}
// Insert the oplog records to the respective tenants change collections.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
opCtx, *records, timestamps);
}
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 032e9109f14..51eebc457bc 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/db_raii.h"
@@ -150,7 +151,7 @@ Status _insertDocumentsToOplogAndChangeCollections(
// Write the corresponding oplog entries to tenants respective change
// collections in the serverless.
- if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
auto status =
ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
opCtx,
@@ -416,8 +417,7 @@ void scheduleWritesToOplogAndChangeCollection(OperationContext* opCtx,
bool skipWritesToOplog) {
// Skip performing any writes during the startup recovery when running in the non-serverless
// environment.
- if (skipWritesToOplog &&
- !ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (skipWritesToOplog && !change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
return;
}
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index d644d5ad734..842cca1bc65 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -481,10 +481,10 @@ TEST_F(OplogApplierImplTest, applyOplogEntryToRecordChangeStreamPreImages) {
WriteUnitOfWork wuow{_opCtx.get()};
ChangeStreamPreImageId preImageId{*(options.uuid), op.getOpTime().getTimestamp(), 0};
BSONObj preImageDocumentKey = BSON("_id" << preImageId.toBSON());
- auto preImageLoadResult =
- getStorageInterface()->deleteById(_opCtx.get(),
- NamespaceString::kChangeStreamPreImagesNamespace,
- preImageDocumentKey.firstElement());
+ auto preImageLoadResult = getStorageInterface()->deleteById(
+ _opCtx.get(),
+ NamespaceString::makePreImageCollectionNSS(boost::none),
+ preImageDocumentKey.firstElement());
repl::getNextOpTime(_opCtx.get());
wuow.commit();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 8da1bcc1f5c..71bcbe60296 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/catalog/local_oplog_info.h"
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/commands/rwc_defaults_commands_gen.h"
@@ -541,7 +542,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
});
// Create the pre-images collection if it doesn't exist yet in the non-serverless environment.
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
ChangeStreamPreImagesCollectionManager::createPreImagesCollection(
opCtx, boost::none /* tenantId */);
}
diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp
index a0ee72fac51..9191769214c 100644
--- a/src/mongo/db/set_change_stream_state_coordinator.cpp
+++ b/src/mongo/db/set_change_stream_state_coordinator.cpp
@@ -34,6 +34,8 @@
#include "mongo/db/change_stream_change_collection_manager.h"
#include "mongo/db/change_stream_pre_images_collection_manager.h"
#include "mongo/db/change_stream_state_gen.h"
+#include "mongo/db/concurrency/exception_util.h"
+#include "mongo/db/op_observer/op_observer.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/logv2/log.h"
@@ -77,15 +79,15 @@ public:
const auto setChangeStreamParameter = ChangeStreamStateParameters::parse(
IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand());
- invariant(_stateDoc.getId().getTenantId());
-
- // TODO SERVER-65950 replace 'tenantId' with the provided tenant id.
- auto tenantId = boost::none;
+ // A tenant's change collection and the pre-images collection are always associated with a
+ // tenant id.
+ const auto tenantId = _stateDoc.getId().getTenantId();
+ tassert(6664100, "Tenant id is missing", tenantId);
if (setChangeStreamParameter.getEnabled()) {
- _enableChangeStream(opCtx, tenantId);
+ _enableChangeStream(opCtx, *tenantId);
} else {
- _disableChangeStream(opCtx, tenantId);
+ _disableChangeStream(opCtx, *tenantId);
}
}
@@ -94,11 +96,38 @@ private:
* Enables the change stream in the serverless by creating the change collection and the
* pre-image collection.
*/
- void _enableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ void _enableChangeStream(OperationContext* opCtx, const TenantId& tenantId) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.createChangeCollection(opCtx, tenantId);
- ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId);
+ // TODO SERVER-66643 Remove this code. A change collection must have atleast one entry for
+ // the change stream to advance. As such artifically create any oplog entry such that it
+ // will be captured by the change collection. With SERVER-66643, the pre-images collection
+ // 'create' oplog entry will be auto captured by the change collection and hence writing
+ // this entry will not be required. Also remove the necessary header and linked library
+ // after removing this code.
+ [&]() {
+ writeConflictRetry(
+ opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ Lock::GlobalLock lock(opCtx, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
+ opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage(
+ opCtx,
+ NamespaceString::makeChangeCollectionNSS(tenantId),
+ boost::none,
+ BSON("msg"
+ << "enable change stream"),
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none,
+ boost::none);
+ wuow.commit();
+ });
+ }();
+
+ // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection.
+ ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, boost::none);
// Wait until the create requests are majority committed.
waitForMajority(opCtx);
@@ -108,11 +137,12 @@ private:
* Disables the change stream in the serverless by dropping the change collection and the
* pre-image collection.
*/
- void _disableChangeStream(OperationContext* opCtx, boost::optional<TenantId> tenantId) {
+ void _disableChangeStream(OperationContext* opCtx, const TenantId& tenantId) {
auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx);
changeCollectionManager.dropChangeCollection(opCtx, tenantId);
- ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId);
+ // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection.
+ ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, boost::none);
// Wait until the drop requests are majority committed.
waitForMajority(opCtx);
diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript
index 01242257d02..a4378a89caf 100644
--- a/src/mongo/db/stats/SConscript
+++ b/src/mongo/db/stats/SConscript
@@ -90,6 +90,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
+ '$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/commands/server_status_core',
'$BUILD_DIR/mongo/db/server_base',
],
diff --git a/src/mongo/db/stats/change_collection_server_status.cpp b/src/mongo/db/stats/change_collection_server_status.cpp
index ee424a4ae43..f7d7f75a75c 100644
--- a/src/mongo/db/stats/change_collection_server_status.cpp
+++ b/src/mongo/db/stats/change_collection_server_status.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/change_stream_change_collection_manager.h"
+#include "mongo/db/change_stream_serverless_helpers.h"
#include "mongo/db/commands/server_status.h"
namespace mongo {
@@ -49,7 +50,7 @@ public:
const BSONElement& configElement,
BSONObjBuilder* result) const override {
// Append the section only when running in serverless.
- if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) {
+ if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
return;
}