From e6b184b48b2f4ceaff580c98c24e14eac26e2c03 Mon Sep 17 00:00:00 2001 From: Rishab Joshi Date: Thu, 15 Sep 2022 10:27:24 +0000 Subject: SERVER-66641 Introduce multi-tenancy for change collections. --- .../change_streams_multitenant_passthrough.yml | 9 +- ...multitenant_sharded_collections_passthrough.yml | 4 +- .../testing/hooks/enable_change_stream.py | 2 +- etc/evergreen.yml | 2 +- .../change_stream_change_collection_role_auth.js | 10 +- .../override_fixtures_changestream_multitenancy.js | 17 +- .../serverless/basic_write_to_change_collection.js | 194 +++++++++---- .../change_collection_expired_document_remover.js | 305 ++++++++++++++------- .../serverless/change_collection_server_stats.js | 41 ++- jstests/serverless/change_stream_state_commands.js | 19 +- .../basic_read_from_change_collection.js | 125 +++++---- .../multitenant_read_from_change_collection.js | 158 +++++++++++ .../serverless/initial_sync_change_collection.js | 23 +- jstests/serverless/libs/change_collection_util.js | 127 +++++++-- .../sharded_change_stream_state_commands.js | 6 +- ...ite_to_change_collection_in_startup_recovery.js | 26 +- src/mongo/db/SConscript | 22 +- src/mongo/db/catalog/drop_collection.cpp | 4 +- ...ange_collection_expired_change_remover_test.cpp | 18 +- ...change_collection_expired_documents_remover.cpp | 29 +- .../change_collection_expired_documents_remover.h | 3 +- .../db/change_stream_change_collection_manager.cpp | 113 +++----- .../db/change_stream_change_collection_manager.h | 21 +- ...change_stream_pre_images_collection_manager.cpp | 26 +- .../change_stream_pre_images_collection_manager.h | 7 - src/mongo/db/change_stream_serverless_helpers.cpp | 101 +++++++ src/mongo/db/change_stream_serverless_helpers.h | 71 +++++ src/mongo/db/change_streams_cluster_parameter.idl | 4 +- src/mongo/db/commands/SConscript | 1 + .../db/commands/change_stream_state_command.cpp | 46 ++-- src/mongo/db/commands/dbcommands.cpp | 4 + src/mongo/db/commands/run_aggregate.cpp | 18 +- .../db/commands/set_cluster_parameter_command.cpp | 4 + src/mongo/db/commands/txn_cmds.cpp | 8 + src/mongo/db/mongod_main.cpp | 6 +- src/mongo/db/namespace_string.cpp | 12 +- src/mongo/db/namespace_string.h | 8 +- src/mongo/db/op_observer/op_observer_impl_test.cpp | 6 +- ...document_source_change_stream_add_pre_image.cpp | 4 +- src/mongo/db/pipeline/sharded_agg_helpers.cpp | 2 +- src/mongo/db/query/query_knobs.idl | 9 + src/mongo/db/repl/SConscript | 3 + src/mongo/db/repl/oplog.cpp | 3 +- src/mongo/db/repl/oplog_applier_impl.cpp | 6 +- src/mongo/db/repl/oplog_applier_impl_test.cpp | 8 +- ...replication_coordinator_external_state_impl.cpp | 3 +- .../db/set_change_stream_state_coordinator.cpp | 50 +++- src/mongo/db/stats/SConscript | 1 + .../db/stats/change_collection_server_status.cpp | 3 +- 49 files changed, 1201 insertions(+), 491 deletions(-) create mode 100644 jstests/serverless/change_streams/multitenant_read_from_change_collection.js create mode 100644 src/mongo/db/change_stream_serverless_helpers.cpp create mode 100644 src/mongo/db/change_stream_serverless_helpers.h diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml index 867a6acc7f1..16a0397b526 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_passthrough.yml @@ -30,7 +30,10 @@ executor: enableMajorityReadConcern: '' # Enable causal consistency for change streams suites using 1 node replica sets. See # change_streams.yml for detailed explanation. - #TODO SERVER-67267 load 'inject_tenant_prefix.js'. + # TODO SERVER-67267 Load 'inject_tenant_prefix.js', add 'multitenancySupport', + # 'featureFlagMongoStore' and 'featureFlagRequireTenantID' flags. The + # 'inject_tenant_prefix' should also be called for the + # 'enable_change_stream.py' file. eval: >- var testingReplication = true; load('jstests/libs/override_methods/set_read_and_write_concerns.js'); @@ -52,6 +55,8 @@ executor: set_parameters: enableTestCommands: 1 featureFlagServerlessChangeStreams: true + internalChangeStreamUseTenantIdForTesting: true failpoint.assertChangeStreamNssCollection: "{'mode': 'alwaysOn', 'data': {'collectionName': 'system.change_collection'}}" - num_nodes: 2 + # TODO SERVER-69115 Set number of nodes to 2 after oplog application is working properly. + num_nodes: 1 diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml index 804045fde38..9a9ef01d75d 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml @@ -63,7 +63,7 @@ executor: shard_options: mongod_options: set_parameters: - # TODO SERVER-67267 check if the feature flag can be removed. - failpoint.forceEnableChangeCollectionsMode: "{mode: 'alwaysOn'}" + featureFlagServerlessChangeStreams: true + internalChangeStreamUseTenantIdForTesting: true failpoint.assertChangeStreamNssCollection: "{'mode': 'alwaysOn', 'data': {'collectionName': 'system.change_collection'}}" diff --git a/buildscripts/resmokelib/testing/hooks/enable_change_stream.py b/buildscripts/resmokelib/testing/hooks/enable_change_stream.py index 501648e73b8..cafefff9370 100644 --- a/buildscripts/resmokelib/testing/hooks/enable_change_stream.py +++ b/buildscripts/resmokelib/testing/hooks/enable_change_stream.py @@ -48,7 +48,7 @@ class EnableChangeStream(interface.Hook): @staticmethod def _set_change_stream_state(connection, enabled): - # TODO SERVER-65950 create change collection for the tenant. + # TODO SERVER-67267 Enable command overrides to use security token. client = connection.get_primary().mongo_client() client.get_database("admin").command({"setChangeStreamState": 1, "enabled": enabled}) diff --git a/etc/evergreen.yml b/etc/evergreen.yml index d1f7f1cc5ff..740fbc61c8e 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -1417,7 +1417,7 @@ buildvariants: - name: change_streams_multitenant_passthrough - name: change_streams_multitenant_sharded_collections_passthrough distros: - - rhel80-medium # TODO SERVER-65950 remove the distro. + - rhel80-medium # TODO SERVER-67543 remove the distro. - name: .misc_js - name: .clustered_collections - name: .concurrency !.large !.ubsan !.no_txns !.debug_only diff --git a/jstests/auth/change_stream_change_collection_role_auth.js b/jstests/auth/change_stream_change_collection_role_auth.js index 1eecbfebd02..60b86a8d823 100644 --- a/jstests/auth/change_stream_change_collection_role_auth.js +++ b/jstests/auth/change_stream_change_collection_role_auth.js @@ -160,11 +160,15 @@ function removeChangeCollectionDoc(authDb) { const replSetTest = new ReplSetTest({name: "shard", nodes: 1, useHostName: true, waitForKeys: false}); -// TODO SERVER-67267: Add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and -// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. +// TODO SERVER-67267: Add 'serverless' flags. replSetTest.startSet({ keyFile: keyFile, - setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})} + setParameter: { + featureFlagServerlessChangeStreams: true, + multitenancySupport: true, + featureFlagMongoStore: true, + internalChangeStreamUseTenantIdForTesting: true, + } }); replSetTest.initiate(); const primary = replSetTest.getPrimary(); diff --git a/jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js b/jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js index a086fbc3005..c9b713bda0c 100644 --- a/jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js +++ b/jstests/libs/override_methods/override_fixtures_changestream_multitenancy.js @@ -22,20 +22,13 @@ ReplSetTest = function(opts) { let setParameter = {}; - // The 'setParameter' that should be merged with 'newOpts' for the sharded-cluster and the - // replica-set. - if (newOptions.hasOwnProperty("shardsvr")) { - setParameter = { - // TODO SERVER-67267 check if 'forceEnableChangeCollectionsMode' can be removed. - "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}), - "failpoint.assertChangeStreamNssCollection": fpAssertChangeStreamNssColl - }; - } else if (!newOptions.hasOwnProperty("configsvr")) { - // This is the case for the replica-set. A change collection does not exist in the - // config server. + // A change collection does not exist in the config server, do not set any change collection + // related parameter. + if (!newOptions.hasOwnProperty("configsvr")) { setParameter = { featureFlagServerlessChangeStreams: true, - "failpoint.assertChangeStreamNssCollection": fpAssertChangeStreamNssColl + internalChangeStreamUseTenantIdForTesting: true, + "failpoint.assertChangeStreamNssCollection": fpAssertChangeStreamNssColl, }; } diff --git a/jstests/serverless/basic_write_to_change_collection.js b/jstests/serverless/basic_write_to_change_collection.js index cbed0377517..e67b299fdac 100644 --- a/jstests/serverless/basic_write_to_change_collection.js +++ b/jstests/serverless/basic_write_to_change_collection.js @@ -2,82 +2,182 @@ // modification operations. // @tags: [ // featureFlagMongoStore, -// requires_fcv_61, +// requires_fcv_62, // ] (function() { "use strict"; -load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries. +// For verifyChangeCollectionEntries and ChangeStreamMultitenantReplicaSetTest. +load("jstests/serverless/libs/change_collection_util.js"); +// For funWithArgs. +load('jstests/libs/parallel_shell_helpers.js'); -const replSetTest = new ReplSetTest({nodes: 2}); - -// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and -// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. -replSetTest.startSet( - {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})}); - -replSetTest.initiate(); +// TODO SERVER-69115 Change to a 2-node replica set. +const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1}); const primary = replSetTest.getPrimary(); const secondary = replSetTest.getSecondary(); + const testDb = primary.getDB("test"); -// Enable the change stream to create the change collection. -assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true})); +// Hard code tenants ids such that a particular tenant can be identified deterministically. +const firstTenantId = ObjectId("6303b6bb84305d2266d0b779"); +const secondTenantId = ObjectId("7303b6bb84305d2266d0b779"); -// Performs writes on the specified collection. -function performWrites(coll) { - const docIds = [1, 2, 3, 4, 5]; +// Connections to the replica set primary that are stamped with their respective tenant ids. +const firstTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId); +const secondTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId); + +// Enable the change stream state such that change collections are created for both tenants. +replSetTest.setChangeStreamState(firstTenantConn, true); +replSetTest.setChangeStreamState(secondTenantConn, true); + +// Performs writes on the specified collection 'coll' such that the corresponding oplog entries are +// captured by the tenant's change collection. +function performWrites(coll, docIds) { docIds.forEach(docId => assert.commandWorked(coll.insert({_id: docId}))); docIds.forEach( docId => assert.commandWorked(coll.update({_id: docId}, {$set: {annotate: "updated"}}))); } -// Test the change collection entries with the oplog by performing some basic writes. -(function testBasicWritesInChangeCollection() { +// Retrieve the last timestamp from the oplog. +function getLatestTimestamp() { const oplogColl = primary.getDB("local").oplog.rs; - const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts; - assert(startOplogTimestamp != undefined); - - performWrites(testDb.stock); - assert(testDb.stock.drop()); + const oplogTimestamp = oplogColl.find().sort({ts: -1}).limit(1).next().ts; + assert(oplogTimestamp !== undefined); + return oplogTimestamp; +} - const endOplogTimestamp = oplogColl.find().toArray().at(-1).ts; - assert(endOplogTimestamp !== undefined); +// Test that writes to two different change collections are isolated and that each change collection +// captures only the relevant oplog entries associated with the corresponding tenant. +(function testWritesWithMultipleTenants() { + jsTestLog("Testing writes on change collections with multiple tenants."); + + // A helper shell function to perform write for the specified 'tenantId'. + function shellFn(hostAddr, collName, tenantId, performWrites) { + load("jstests/serverless/libs/change_collection_util.js"); + + const tenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId); + + const docIds = Array.from({length: 300}, (_, index) => index); + performWrites(tenantConn.getDB("test").getCollection(collName), docIds); + + assert(tenantConn.getDB("test").getCollection(collName).drop()); + } + + const startOplogTimestamp = getLatestTimestamp(); + + // Perform writes for the first tenant in a different shell. + const firstTenantShellReturn = + startParallelShell(funWithArgs(shellFn, + primary.host, + "testWritesWithMultipleTenants_firstTenant", + firstTenantId, + performWrites), + primary.port); + + // Perform writes to the second tenant parallely with the first tenant. + const secondTenantShellReturn = + startParallelShell(funWithArgs(shellFn, + primary.host, + "testWritesWithMultipleTenants_secondTenant", + secondTenantId, + performWrites), + primary.port); + + // Wait for both shells to return. + firstTenantShellReturn(); + secondTenantShellReturn(); + + const endOplogTimestamp = getLatestTimestamp(); assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0); - // Wait for the replication to finish. - replSetTest.awaitReplication(); + // Verify that both change collections captured their respective tenant's oplog entries in + // the primary. + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId); + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId); - // Verify that the change collection entries are the same as the oplog in the primary and the - // secondary node. - verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp); - verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp); + // TODO SERVER-69115 Uncomment this. + /** + //Wait for the replication to finish. + replSetTest.awaitReplication(); + // Verify that both change collections captured their respective tenant's oplog entries in + // the secondary. + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId); + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, + secondTenantId); + */ })(); -// Test the change collection entries with the oplog by performing writes in a transaction. -(function testWritesinChangeCollectionWithTrasactions() { - const oplogColl = primary.getDB("local").oplog.rs; - const startOplogTimestamp = oplogColl.find().toArray().at(-1).ts; - assert(startOplogTimestamp != undefined); +// Test that transactional writes to two different change collections are isolated and that each +// change collection captures only the relevant 'applyOps' oplog entries associated with the +// corresponding tenant. +(function testTransactionalWritesWithMultipleTenants() { + jsTestLog("Testing transactional writes on change collections with multiple tenants."); - const session = testDb.getMongo().startSession(); - const sessionDb = session.getDatabase(testDb.getName()); - session.startTransaction(); - performWrites(sessionDb.test); - session.commitTransaction_forTesting(); + // A helper shell function to perform transactional write for the specified 'tenantId'. + function shellFn(hostAddr, collName, tenantId, performWrites) { + load("jstests/serverless/libs/change_collection_util.js"); - const endOplogTimestamp = oplogColl.find().toArray().at(-1).ts; - assert(endOplogTimestamp != undefined); + const tenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(hostAddr, tenantId); + + const session = tenantConn.getDB("test").getMongo().startSession(); + const sessionDb = session.getDatabase("test"); + + session.startTransaction(); + + const docIds = Array.from({length: 300}, (_, index) => index); + performWrites(sessionDb.getCollection(collName), docIds); + + session.commitTransaction_forTesting(); + } + + const startOplogTimestamp = getLatestTimestamp(); + + // Perform writes within a transaction for the first tenant. + const firstTenantShellReturn = + startParallelShell(funWithArgs(shellFn, + primary.host, + "testTransactionalWritesWithMultipleTenants_firstTenant", + firstTenantId, + performWrites), + primary.port); + + // Perform parallel writes within a transaction for the second tenant. + const secondTenantShellReturn = + startParallelShell(funWithArgs(shellFn, + primary.host, + "testTransactionalWritesWithMultipleTenants_secondTenant", + secondTenantId, + performWrites), + primary.port); + + // Wait for shells to return. + firstTenantShellReturn(); + secondTenantShellReturn(); + + const endOplogTimestamp = getLatestTimestamp(); assert(timestampCmp(endOplogTimestamp, startOplogTimestamp) > 0); + // Verify that both change collections captured their respective tenant's 'applyOps' oplog + // entries in the primary. + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, firstTenantId); + verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp, secondTenantId); + + // TODO SERVER-69115 Uncomment this. + /** // Wait for the replication to finish. replSetTest.awaitReplication(); - - // Verify that the change collection entries are the same as the oplog in the primary and the - // secondary node for the applyOps. - verifyChangeCollectionEntries(primary, startOplogTimestamp, endOplogTimestamp); - verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp); + // Verify that both change collections captured their respective tenant's 'applyOps' oplog + // entries in the secondary. + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, firstTenantId); + verifyChangeCollectionEntries(secondary, startOplogTimestamp, endOplogTimestamp, + secondTenantId); + */ })(); replSetTest.stopSet(); diff --git a/jstests/serverless/change_collection_expired_document_remover.js b/jstests/serverless/change_collection_expired_document_remover.js index 9c76d84c7bc..09e6b0a912d 100644 --- a/jstests/serverless/change_collection_expired_document_remover.js +++ b/jstests/serverless/change_collection_expired_document_remover.js @@ -1,52 +1,47 @@ /** * Tests the change collection periodic remover job. * - * @tags: [requires_fcv_61] + * @tags: [requires_fcv_62] */ (function() { "use strict"; -load("jstests/libs/fail_point_util.js"); // For configureFailPoint. -load("jstests/libs/collection_drop_recreate.js"); // For assertDropAndRecreateCollection. +// For configureFailPoint. +load("jstests/libs/fail_point_util.js"); +// For assertDropAndRecreateCollection. +load("jstests/libs/collection_drop_recreate.js"); +// For ChangeStreamMultitenantReplicaSetTest. +load("jstests/serverless/libs/change_collection_util.js"); -const kExpiredChangeRemovalJobSleepSeconds = 5; +const getTenantConnection = ChangeStreamMultitenantReplicaSetTest.getTenantConnection; + +// Sleep interval in seconds for the change collection remover job. +const kExpiredRemovalJobSleepSeconds = 5; +// Number of seconds after which the documents in change collections will be expired. const kExpireAfterSeconds = 1; +// Number of seconds to sleep before inserting the next batch of documents in collections. const kSleepBetweenWritesSeconds = 5; +// Millisecond(s) that can be added to the wall time to advance it marginally. const kSafetyMarginMillis = 1; -const rst = new ReplSetTest({nodes: 2}); - -// TODO SERVER-67267: Add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and -// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. -rst.startSet({ - setParameter: { - "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}), - changeCollectionRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds - } +// TODO SERVER-69115 Change to a 2-node replica set. +const replSet = new ChangeStreamMultitenantReplicaSetTest({ + nodes: 1, + setParameter: + {changeCollectionExpiredDocumentsRemoverJobSleepSeconds: kExpiredRemovalJobSleepSeconds} }); -rst.initiate(); - -const primary = rst.getPrimary(); -const secondary = rst.getSecondary(); -const testDb = primary.getDB(jsTestName()); - -// Enable change streams to ensure the creation of change collections. -assert.commandWorked(primary.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true})); -// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'. -assert.commandWorked(primary.getDB("admin").runCommand( - {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}})); +const primary = replSet.getPrimary(); -// TODO SERVER-65950 Extend the test case to account for multi-tenancy. -const primaryChangeCollection = primary.getDB("config").system.change_collection; -const secondaryChangeCollection = secondary.getDB("config").system.change_collection; +// TODO SERVER-69115 Uncomment this code. +// const secondary = replSet.getSecondary(); // Assert that the change collection contains all documents in 'expectedRetainedDocs' and no -// document in 'expectedDeletedDocs' for the collection 'testColl'. +// document in 'expectedDeletedDocs' for the collection 'stocksColl'. function assertChangeCollectionDocuments( - changeColl, testColl, expectedDeletedDocs, expectedRetainedDocs) { - const collNss = `${testDb.getName()}.${testColl.getName()}`; + changeColl, stocksColl, expectedDeletedDocs, expectedRetainedDocs) { + const collNss = `${stocksTestDb.getName()}.${stocksColl.getName()}`; const pipeline = (collectionEntries) => [{$match: {op: "i", ns: collNss}}, {$replaceRoot: {"newRoot": "$o"}}, {$match: {$or: collectionEntries}}]; @@ -72,80 +67,180 @@ function getDocumentOperationTime(doc) { return oplogEntry.wall.getTime(); } -(function testOnlyExpiredDocumentsDeleted() { - assertDropAndRecreateCollection(testDb, "stocks"); - const testColl = testDb.stocks; - - // Wait until the remover job hangs. - let fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges"); - fpHangBeforeRemovingDocs.wait(); - - // Insert 5 documents. - const expiredDocuments = [ - {_id: "aapl", price: 140}, - {_id: "dis", price: 100}, - {_id: "nflx", price: 185}, - {_id: "baba", price: 66}, - {_id: "amc", price: 185} - ]; - - assert.commandWorked(testColl.insertMany(expiredDocuments)); - assertChangeCollectionDocuments(primaryChangeCollection, - testColl, - /* expectedDeletedDocs */[], - /* expectedRetainedDocs */ expiredDocuments); - const lastExpiredDocumentTime = getDocumentOperationTime(expiredDocuments.at(-1)); - - // Sleep for 'kSleepBetweenWritesSeconds' duration such that the next batch of inserts - // has a sufficient delay in their wall time relative to the previous batch. - sleep(kSleepBetweenWritesSeconds * 1000); - - // Insert 5 more documents. - const nonExpiredDocuments = [ - {_id: "wmt", price: 11}, - {_id: "coin", price: 23}, - {_id: "ddog", price: 15}, - {_id: "goog", price: 199}, - {_id: "tsla", price: 12} - ]; - - assert.commandWorked(testColl.insertMany(nonExpiredDocuments)); - assertChangeCollectionDocuments(primaryChangeCollection, - testColl, - /* expectedDeletedDocs */[], - /* expectedRetainedDocs */ nonExpiredDocuments); - - // Calculate the 'currentWallTime' such that only the first batch of inserted documents - // should be expired, ie.: 'lastExpiredDocumentTime' + 'kExpireAfterSeconds' < - // 'currentWallTime' < 'firstNonExpiredDocument' - const currentWallTime = - new Date(lastExpiredDocumentTime + kExpireAfterSeconds * 1000 + kSafetyMarginMillis); - const fpInjectWallTime = configureFailPoint( - primary, "injectCurrentWallTimeForRemovingExpiredDocuments", {currentWallTime}); - - // Unblock the change collection remover job such that it picks up on the injected - // 'currentWallTime'. - fpHangBeforeRemovingDocs.off(); - - // Wait until the remover job has retrieved the injected 'currentWallTime' and reset the first - // failpoint. - fpInjectWallTime.wait(); - - // Wait for a complete cycle of the TTL job. - fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges"); - fpHangBeforeRemovingDocs.wait(); - - // Assert that the first 5 documents got deleted, but the later 5 documents did not. - assertChangeCollectionDocuments( - primaryChangeCollection, testColl, expiredDocuments, nonExpiredDocuments); - - // Wait for the replication to complete and assert that the expired documents also have been - // deleted from the secondary. - rst.awaitReplication(); - assertChangeCollectionDocuments( - secondaryChangeCollection, testColl, expiredDocuments, nonExpiredDocuments); - fpHangBeforeRemovingDocs.off(); -})(); +// Hard code a tenant ids such that tenants can be identified deterministically. +const stocksTenantId = ObjectId("6303b6bb84305d2266d0b779"); +const citiesTenantId = ObjectId("7303b6bb84305d2266d0b779"); +const notUsedTenantId = ObjectId("8303b6bb84305d2266d0b779"); + +// Create connections to the primary such that they have respective tenant ids stamped. +const stocksTenantConnPrimary = getTenantConnection(primary.host, stocksTenantId); +const citiesTenantConnPrimary = getTenantConnection(primary.host, citiesTenantId); -rst.stopSet(); +// Create a tenant connection associated with 'notUsedTenantId' such that only the tenant id exists +// in the replica set but no corresponding change collection exists. The purging job should safely +// ignore this tenant without any side-effects. +const notUsedTenantConnPrimary = getTenantConnection(primary.host, notUsedTenantId); + +// TODO SERVER-69115 Uncomment this code and use tenants connections to the secondary. +/** +const stocksTenantConnSecondary = getTenantConnection(secondary.host, stocksTenantId); +const citiesTenantConnSecondary = getTenantConnection(secondary.host, citiesTenantId); +*/ + +// TODO SERVER-69115 Uncomment this code and fetch tenants change collection on the secondary. +/** +const stocksChangeCollectionSecondary = +stocksTenantConnSecondary.getDB("config").system.change_collection; const +citiesChangeCollectionSecondary = +citiesTenantConnSecondary.getDB("config").system.change_collection; +*/ + +// Enable change streams for both tenants. +replSet.setChangeStreamState(stocksTenantConnPrimary, true); +replSet.setChangeStreamState(citiesTenantConnPrimary, true); + +// Verify change streams state for all tenants. +assert.eq(replSet.getChangeStreamState(stocksTenantConnPrimary), true); +assert.eq(replSet.getChangeStreamState(citiesTenantConnPrimary), true); +assert.eq(replSet.getChangeStreamState(notUsedTenantConnPrimary), false); + +// Get tenants respective change collections. +const stocksChangeCollectionPrimary = + stocksTenantConnPrimary.getDB("config").system.change_collection; +const citiesChangeCollectionPrimary = + citiesTenantConnPrimary.getDB("config").system.change_collection; + +// Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'. +// TODO SERVER-69511 Use tenants connections instead of 'primary' to set 'expireAfterSeconds'. +assert.commandWorked(primary.getDB("admin").runCommand( + {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}})); + +// Get tenants respective collections for testing. +const stocksTestDb = stocksTenantConnPrimary.getDB(jsTestName()); +const citiesTestDb = citiesTenantConnPrimary.getDB(jsTestName()); +const notUsedTestDb = notUsedTenantConnPrimary.getDB(jsTestName()); + +const stocksColl = assertDropAndRecreateCollection(stocksTestDb, "stocks"); +const citiesColl = assertDropAndRecreateCollection(citiesTestDb, "cities"); +const notUsedColl = assertDropAndRecreateCollection(notUsedTestDb, "notUsed"); + +// Wait until the remover job hangs. +let fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges"); +fpHangBeforeRemovingDocs.wait(); + +// Insert 5 documents to the 'stocks' collection owned by the 'stocksTenantId' that should be +// deleted. +const stocksExpiredDocuments = [ + {_id: "aapl", price: 140}, + {_id: "dis", price: 100}, + {_id: "nflx", price: 185}, + {_id: "baba", price: 66}, + {_id: "amc", price: 185} +]; + +// Insert 4 documents to the 'cities' collection owned by the 'citiesTenantId' that should be +// deleted. +const citiesExpiredDocuments = [ + {_id: "toronto", area_km2: 630}, + {_id: "singapore ", area_km2: 728}, + {_id: "london", area_km2: 1572}, + {_id: "tokyo", area_km2: 2194} +]; + +assert.commandWorked(stocksColl.insertMany(stocksExpiredDocuments)); +assertChangeCollectionDocuments(stocksChangeCollectionPrimary, + stocksColl, + /* expectedDeletedDocs */[], + /* expectedRetainedDocs */ stocksExpiredDocuments); + +assert.commandWorked(citiesColl.insertMany(citiesExpiredDocuments)); +assertChangeCollectionDocuments(citiesChangeCollectionPrimary, + citiesColl, + /* expectedDeletedDocs */[], + /* expectedRetainedDocs */ citiesExpiredDocuments); + +// Insert 2 documents to the 'notUsed' collection such that the associated tenant becomes visible to +// the mongoD. The documents in these collections will not be consumed by the change stream. +const notUsedDocuments = + [{_id: "cricket_bat", since_years: 2}, {_id: "tennis_racket", since_years: 2}]; +assert.commandWorked(notUsedColl.insertMany(notUsedDocuments)); + +// All document before and inclusive this wall time will be deleted by the purging job. +const lastExpiredDocumentTime = getDocumentOperationTime(citiesExpiredDocuments.at(-1)); + +// Sleep for 'kSleepBetweenWritesSeconds' duration such that the next batch of inserts +// has a sufficient delay in their wall time relative to the previous batch. +sleep(kSleepBetweenWritesSeconds * 1000); + +// Insert 5 documents to the 'stocks' collection owned by the 'stocksTenantId' that should not be +// deleted. +const stocksNonExpiredDocuments = [ + {_id: "wmt", price: 11}, + {_id: "coin", price: 23}, + {_id: "ddog", price: 15}, + {_id: "goog", price: 199}, + {_id: "tsla", price: 12} +]; + +// Insert 4 documents to the 'cities' collection owned by the 'citiesTenantId' that should not be +// deleted. +const citiesNonExpiredDocuments = [ + {_id: "dublin", area_km2: 117}, + {_id: "new york", area_km2: 783}, + {_id: "hong kong", area_km2: 1114}, + {_id: "sydney", area_km2: 12386} +]; + +assert.commandWorked(stocksColl.insertMany(stocksNonExpiredDocuments)); +assertChangeCollectionDocuments(stocksChangeCollectionPrimary, + stocksColl, + /* expectedDeletedDocs */[], + /* expectedRetainedDocs */ stocksNonExpiredDocuments); + +assert.commandWorked(citiesColl.insertMany(citiesNonExpiredDocuments)); +assertChangeCollectionDocuments(citiesChangeCollectionPrimary, + citiesColl, + /* expectedDeletedDocs */[], + /* expectedRetainedDocs */ citiesNonExpiredDocuments); + +// Calculate the 'currentWallTime' such that only the first batch of inserted documents +// should be expired, ie.: 'lastExpiredDocumentTime' + 'kExpireAfterSeconds' < +// 'currentWallTime' < first-non-expired-document. +const currentWallTime = + new Date(lastExpiredDocumentTime + kExpireAfterSeconds * 1000 + kSafetyMarginMillis); +const fpInjectWallTime = configureFailPoint( + primary, "injectCurrentWallTimeForRemovingExpiredDocuments", {currentWallTime}); + +// Unblock the change collection remover job such that it picks up on the injected +// 'currentWallTime'. +fpHangBeforeRemovingDocs.off(); + +// Wait until the remover job has retrieved the injected 'currentWallTime' and reset the first +// failpoint. +fpInjectWallTime.wait(); + +// Wait for a complete cycle of the TTL job. +fpHangBeforeRemovingDocs = configureFailPoint(primary, "hangBeforeRemovingExpiredChanges"); +fpHangBeforeRemovingDocs.wait(); + +// Assert that only required documents are retained in change collections. +assertChangeCollectionDocuments( + stocksChangeCollectionPrimary, stocksColl, stocksExpiredDocuments, stocksNonExpiredDocuments); +assertChangeCollectionDocuments( + citiesChangeCollectionPrimary, citiesColl, citiesExpiredDocuments, citiesNonExpiredDocuments); + +// TODO SERVER-69115 Uncomment this code block. +/** +// Wait for the replication to complete and assert that the expired documents also have been +// deleted from the secondary. +replSet.awaitReplication(); +assertChangeCollectionDocuments(stocksChangeCollectionSecondary, +stocksColl, stocksExpiredDocuments,stocksNonExpiredDocuments); +assertChangeCollectionDocuments(citiesChangeCollectionSecondary, +citiesColl, citiesExpiredDocuments, citiesNonExpiredDocuments); +*/ + +fpHangBeforeRemovingDocs.off(); + +replSet.stopSet(); })(); diff --git a/jstests/serverless/change_collection_server_stats.js b/jstests/serverless/change_collection_server_stats.js index a5992259d78..12f8803847c 100644 --- a/jstests/serverless/change_collection_server_stats.js +++ b/jstests/serverless/change_collection_server_stats.js @@ -5,34 +5,44 @@ (function() { 'use strict'; -load("jstests/libs/fail_point_util.js"); +// For verifyGetDiagnosticData. load('jstests/libs/ftdc.js'); +// For ChangeStreamMultitenantReplicaSetTest. +load("jstests/serverless/libs/change_collection_util.js"); -const kExpiredChangeRemovalJobSleepSeconds = 5; +const kExpiredChangeRemovalJobSleepSeconds = 1; const kExpireAfterSeconds = 1; -const rst = new ReplSetTest({nodes: 1}); -rst.startSet({ - setParameter: { - featureFlagServerlessChangeStreams: 1, - changeCollectionRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds, - } +const replicaSet = new ChangeStreamMultitenantReplicaSetTest({ + nodes: 1, + changeCollectionExpiredDocumentsRemoverJobSleepSeconds: kExpiredChangeRemovalJobSleepSeconds }); -rst.initiate(); -const primary = rst.getPrimary(); +const primary = replicaSet.getPrimary(); const adminDb = primary.getDB('admin'); -const testDb = primary.getDB(jsTestName()); -const changeCollection = primary.getDB("config").system.change_collection; + +// Hard code the tenant id such that the tenant can be identified deterministically. +const tenantId = ObjectId("6303b6bb84305d2266d0b779"); + +// Connection to the replica set primary that are stamped with their respective tenant ids. +const tenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, tenantId); + +const testDb = tenantConn.getDB(jsTestName()); // Enable change streams to ensure the creation of change collections if run in serverless mode. -assert.commandWorked(adminDb.runCommand({setChangeStreamState: 1, enabled: true})); +assert.commandWorked( + tenantConn.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true})); + +const changeCollection = tenantConn.getDB("config").system.change_collection; + assert.soon(() => { // Ensure that server status diagnostics is collecting change collection statistics. const serverStatusDiagnostics = verifyGetDiagnosticData(adminDb).serverStatus; return serverStatusDiagnostics.hasOwnProperty('changeCollections') && serverStatusDiagnostics.changeCollections.hasOwnProperty('purgingJob'); }); + const diagnosticsBeforeTestCollInsertions = verifyGetDiagnosticData(adminDb).serverStatus.changeCollections.purgingJob; @@ -53,6 +63,7 @@ const estimatedToBeRemovedDocsSize = changeCollection.find() assert.gt(estimatedToBeRemovedDocsSize, 0); // Set the 'expireAfterSeconds' to 'kExpireAfterSeconds'. +// TODO SERVER-69511 Use 'tenantConn' instead of 'primary' to set the 'expireAfterSeconds'. assert.commandWorked(adminDb.runCommand( {setClusterParameter: {changeStreams: {expireAfterSeconds: kExpireAfterSeconds}}})); @@ -71,7 +82,7 @@ assert.soon(() => { diagnosticsBeforeTestCollInsertions.totalPass && diagnosticsAfterTestCollInsertions.scannedCollections > diagnosticsBeforeTestCollInsertions.scannedCollections && - diagnosticsAfterTestCollInsertions.bytesDeleted > + diagnosticsAfterTestCollInsertions.bytesDeleted >= diagnosticsBeforeTestCollInsertions.bytesDeleted + estimatedToBeRemovedDocsSize && diagnosticsAfterTestCollInsertions.docsDeleted > diagnosticsBeforeTestCollInsertions.docsDeleted + numberOfDocuments - 1 && @@ -81,5 +92,5 @@ assert.soon(() => { diagnosticsBeforeTestCollInsertions.timeElapsedMillis; }); -rst.stopSet(); +replicaSet.stopSet(); }()); diff --git a/jstests/serverless/change_stream_state_commands.js b/jstests/serverless/change_stream_state_commands.js index fe5d38a2a67..373bad342b5 100644 --- a/jstests/serverless/change_stream_state_commands.js +++ b/jstests/serverless/change_stream_state_commands.js @@ -1,24 +1,25 @@ // Test that the 'setChangeStreamState' and 'getChangeStreamState' commands work as expected in the // multi-tenant replica sets environment for various cases. // @tags: [ -// featureFlagMongoStore, -// requires_fcv_61, +// requires_fcv_62, +// __TEMPORARILY_DISABLED__ // ] (function() { "use strict"; -load("jstests/libs/fail_point_util.js"); // For configureFailPoint. -load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeCollectionEntries. -load('jstests/libs/parallel_shell_helpers.js'); // For funWithArgs. +load("jstests/libs/fail_point_util.js"); // For configureFailPoint. +load('jstests/libs/parallel_shell_helpers.js'); // For funWithArgs. const replSetTest = new ReplSetTest({nodes: 2}); -// TODO SERVER-67267 Add 'featureFlagServerlessChangeStreams' and 'serverless' flags and remove -// 'failpoint.forceEnableChangeCollectionsMode'. +// TODO SERVER-67267 Add 'serverless' flag. +// TODO SERVER-68947 Add 'featureFlagRequireTenantID' flag. +// TODO SERVER-69115 Remove '__TEMPORARILY_DISABLED__' replSetTest.startSet({ setParameter: { - "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}), - multitenancySupport: true + featureFlagServerlessChangeStreams: true, + multitenancySupport: true, + featureFlagMongoStore: true } }); diff --git a/jstests/serverless/change_streams/basic_read_from_change_collection.js b/jstests/serverless/change_streams/basic_read_from_change_collection.js index cfe3ab53b88..98679d18c31 100644 --- a/jstests/serverless/change_streams/basic_read_from_change_collection.js +++ b/jstests/serverless/change_streams/basic_read_from_change_collection.js @@ -1,63 +1,82 @@ // Tests that a change stream can be opened on a change collection when one exists, and that an // exception is thrown if we attempt to open a stream while change streams are disabled. // @tags: [ -// featureFlagMongoStore, -// requires_fcv_61, +// requires_fcv_62, // assumes_against_mongod_not_mongos, // ] (function() { "use strict"; -(function runInReplicaSet() { - const replSetTest = new ReplSetTest({nodes: 1}); - - // TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and - // 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. - replSetTest.startSet( - {setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})}}); - - replSetTest.initiate(); - - const connection = replSetTest.getPrimary(); - - // Enable change stream such that it creates the change collection. - // TODO SERVER-65950 pass tenant id to the command. - assert.commandWorked( - connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: true})); - assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1})) - .enabled, - true); - - // Insert a document to the 'stockPrice' collection. - const testDb = connection.getDB("test"); - const csCursor = connection.getDB("test").stockPrice.watch([]); - testDb.stockPrice.insert({_id: "mdb", price: 250}); - testDb.stockPrice.insert({_id: "tsla", price: 650}); - - // Verify that the change stream observes the required event. - assert.soon(() => csCursor.hasNext()); - const event1 = csCursor.next(); - assert.eq(event1.documentKey._id, "mdb"); - assert.soon(() => csCursor.hasNext()); - const event2 = csCursor.next(); - assert.eq(event2.documentKey._id, "tsla"); - - // Disable the change stream while the change stream cursor is still opened. - // TODO SERVER-65950 pass tenant id to the command. - assert.commandWorked( - connection.getDB("admin").runCommand({setChangeStreamState: 1, enabled: false})); - assert.eq(assert.commandWorked(connection.getDB("admin").runCommand({getChangeStreamState: 1})) - .enabled, - false); - - // Verify that the cursor throws 'QueryPlanKilled' exception on doing get next. - assert.throwsWithCode(() => assert.soon(() => csCursor.hasNext()), ErrorCodes.QueryPlanKilled); - - // Open a new change stream cursor with change stream disabled state and verify that - // 'ChangeStreamNotEnabled' exception is thrown. - assert.throwsWithCode(() => testDb.stock.watch([]), ErrorCodes.ChangeStreamNotEnabled); - - replSetTest.stopSet(); -})(); +// For ChangeStreamMultitenantReplicaSetTest. +load("jstests/serverless/libs/change_collection_util.js"); +// For assertDropAndRecreateCollection. +load("jstests/libs/collection_drop_recreate.js"); + +// TODO SERVER-69115 Change to a 2-node replica set. +const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1}); +const primary = replSetTest.getPrimary(); + +// Hard code tenants id such that the tenant can be identified deterministically. +const tenantId = ObjectId("6303b6bb84305d2266d0b779"); + +// Connection to the replica set primary that is stamped with the tenant id. +const tenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, tenantId); + +// Verify that the change stream observes expected events. +function verifyChangeEvents(csCursor, expectedEvents) { + for (const [expectedOpType, expectedDoc] of expectedEvents) { + assert.soon(() => csCursor.hasNext()); + const event = csCursor.next(); + + assert.eq(event.operationType, expectedOpType, event); + if (event.operationType == "insert") { + assert.eq(event.fullDocument, expectedDoc); + } else if (event.operationType == "drop") { + assert.soon(() => csCursor.hasNext()); + assert.eq(csCursor.isClosed(), true); + } + } +} + +// Enable change stream for the first tenant. +replSetTest.setChangeStreamState(tenantConn, true); + +// Open the change stream cursor. +const testDb = tenantConn.getDB("test"); +const csCursor = testDb.stockPrice.watch([]); + +// Insert documents to the 'stockPrice' collection. +const docs = [{_id: "mdb", price: 250}, {_id: "tsla", price: 650}]; +docs.forEach(doc => assert.commandWorked(testDb.stockPrice.insert(doc))); + +// Drop the stock price collection to invalidate the change stream cursor. +assert(testDb.stockPrice.drop()); + +// Verify that the change stream observes the required event. +verifyChangeEvents(csCursor, [["insert", docs[0]], ["insert", docs[1]], ["drop", []]]); + +// Disable and then enable the change stream. +replSetTest.setChangeStreamState(tenantConn, false); +replSetTest.setChangeStreamState(tenantConn, true); + +// Add a new document to the 'stockPrice' collection and verify that re-enabling the change +// stream works correctly. +const newCsCursor = testDb.stockPrice.watch([]); +const newDocs = [{_id: "goog", price: 2000}]; +newDocs.forEach(doc => assert.commandWorked(testDb.stockPrice.insert(doc))); +verifyChangeEvents(newCsCursor, [["insert", newDocs[0]]]); + +// Disable the change stream while the change stream cursor is still opened. +replSetTest.setChangeStreamState(tenantConn, false); + +// Verify that the cursor throws 'QueryPlanKilled' exception on doing get next. +assert.throwsWithCode(() => assert.soon(() => newCsCursor.hasNext()), ErrorCodes.QueryPlanKilled); + +// Open a new change stream cursor with change stream disabled state and verify that +// 'ChangeStreamNotEnabled' exception is thrown. +assert.throwsWithCode(() => testDb.stock.watch([]), ErrorCodes.ChangeStreamNotEnabled); + +replSetTest.stopSet(); }()); diff --git a/jstests/serverless/change_streams/multitenant_read_from_change_collection.js b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js new file mode 100644 index 00000000000..b10941b7999 --- /dev/null +++ b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js @@ -0,0 +1,158 @@ +// Tests the behaviour of change streams on change collections in an environment with more than one +// active tenant. +// @tags: [ +// requires_fcv_62, +// assumes_against_mongod_not_mongos, +// ] + +(function() { +"use strict"; + +// For ChangeStreamMultitenantReplicaSetTest. +load("jstests/serverless/libs/change_collection_util.js"); +// For assertDropAndRecreateCollection. +load("jstests/libs/collection_drop_recreate.js"); + +// TODO SERVER-69115 Change to a 2-node replica set. +const replSetTest = new ChangeStreamMultitenantReplicaSetTest({nodes: 1}); +const primary = replSetTest.getPrimary(); + +// Hard code tenants ids such that a particular tenant can be identified deterministically. +const firstTenantId = ObjectId("6303b6bb84305d2266d0b779"); +const secondTenantId = ObjectId("7303b6bb84305d2266d0b779"); + +// Connections to the replica set primary that are stamped with their respective tenant ids. +const firstTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, firstTenantId); +const secondTenantConn = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(primary.host, secondTenantId); + +// Verify that the change stream observes expected events. The method also collects resume tokens +// for each expected change collection and returns those on successful assertion. +function verifyEventsAndGetResumeTokens(csCursor, expectedEvents) { + let resumeTokens = []; + + for (const [expectedOpType, expectedDoc] of expectedEvents) { + assert.soon(() => csCursor.hasNext()); + const event = csCursor.next(); + + assert.eq(event.operationType, expectedOpType, event); + if (event.operationType == "insert") { + assert.eq(event.fullDocument, expectedDoc); + } else if (event.operationType == "drop") { + assert.soon(() => csCursor.hasNext()); + assert.eq(csCursor.isClosed(), true); + } + + resumeTokens.push(csCursor.getResumeToken()); + } + + return resumeTokens; +} + +// Get the 'test' db for both tenants. +const firstTenantTestDb = firstTenantConn.getDB("test"); +const secondTenantTestDb = secondTenantConn.getDB("test"); + +// Recreate the 'stockPrice' collection to delete any old documents. +assertDropAndRecreateCollection(firstTenantTestDb, "stockPrice"); +assertDropAndRecreateCollection(secondTenantTestDb, "stockPrice"); + +// Create a new incarnation of the change collection for the first tenant. +replSetTest.setChangeStreamState(firstTenantConn, false); +replSetTest.setChangeStreamState(firstTenantConn, true); + +// These documents will be inserted in tenants 'stockPrice' collections. +const firstTenantDocs = + [{_id: "mdb", price: 350}, {_id: "goog", price: 2000}, {_id: "nflx", price: 220}]; +const secondTenantDocs = + [{_id: "amzn", price: 3000}, {_id: "tsla", price: 750}, {_id: "aapl", price: 160}]; + +// Open the change stream cursor for the first tenant. +const firstTenantCsCursor = firstTenantTestDb.stockPrice.watch([]); + +// Fetch the latest timestamp before enabling the change stream for the second tenant. +const startAtOperationTime = + primary.getDB("local").oplog.rs.find().sort({ts: -1}).limit(1).next().ts; +assert(startAtOperationTime !== undefined); + +// Now create the change collection for the second tenant. The oplog timestamp associated with the +// second tenant's create change collection will be greater than the 'startAtOperationTime'. +replSetTest.setChangeStreamState(secondTenantConn, false); +replSetTest.setChangeStreamState(secondTenantConn, true); + +// Open the change stream cursor for the second tenant. +const secondTenantCsCursor = secondTenantTestDb.stockPrice.watch([]); + +// Insert documents to both change collections in jumbled fashion. +assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[0])); +assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[0])); +assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[1])); +assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[1])); +assert.commandWorked(secondTenantTestDb.stockPrice.insert(secondTenantDocs[2])); +assert.commandWorked(firstTenantTestDb.stockPrice.insert(firstTenantDocs[2])); + +// Verify that each change stream emits only the required tenant's change events and that there +// is no leak of events amongst the change streams. Do not consume all events for the first +// tenant as it will be consumed later. +const firstTenantResumeTokens = verifyEventsAndGetResumeTokens( + firstTenantCsCursor, [["insert", firstTenantDocs[0]], ["insert", firstTenantDocs[1]]]); +const secondTenantResumeTokens = verifyEventsAndGetResumeTokens(secondTenantCsCursor, [ + ["insert", secondTenantDocs[0]], + ["insert", secondTenantDocs[1]], + ["insert", secondTenantDocs[2]] +]); + +// Verify that change streams from both tenants can be resumed using their respective resume token. +verifyEventsAndGetResumeTokens( + firstTenantTestDb.stockPrice.watch([], {resumeAfter: firstTenantResumeTokens[0]}), + [["insert", firstTenantDocs[1]], ["insert", firstTenantDocs[2]]]); +verifyEventsAndGetResumeTokens( + secondTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}), + [["insert", secondTenantDocs[1]], ["insert", secondTenantDocs[2]]]); + +// Verify that resume tokens cannot be exchanged between tenants change streams. +assert.throwsWithCode( + () => secondTenantTestDb.stockPrice.watch([], {resumeAfter: firstTenantResumeTokens[0]}), + ErrorCodes.ChangeStreamFatalError); +assert.throwsWithCode( + () => firstTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}), + ErrorCodes.ChangeStreamFatalError); + +// Verify that the first tenant's change stream can be resumed using the timestamp +// 'startAtOperationTime'. +verifyEventsAndGetResumeTokens( + firstTenantTestDb.stockPrice.watch([], {startAtOperationTime: startAtOperationTime}), [ + ["insert", firstTenantDocs[0]], + ["insert", firstTenantDocs[1]], + ["insert", firstTenantDocs[2]] + ]); + +// Verify that the second tenant's change stream cannot be resumed with the timestamp +// 'startAtOperationTime' and should throw change stream history lost. +assert.throwsWithCode( + () => secondTenantTestDb.stockPrice.watch([], {startAtOperationTime: startAtOperationTime}), + ErrorCodes.ChangeStreamHistoryLost); + +// Ensure that disabling the change stream for the second tenant does not impact the change +// stream of the first tenant. +replSetTest.setChangeStreamState(secondTenantConn, false); + +// The next on the change stream for the second tenant should now throw exception. +assert.throwsWithCode(() => assert.soon(() => secondTenantCsCursor.hasNext()), + ErrorCodes.QueryPlanKilled); + +// The next of the change stream for the first tenant should continue to work. Since we have +// still not consumed all event from the first tenant, the change stream should emit the +// remaining ones. +verifyEventsAndGetResumeTokens(firstTenantCsCursor, [["insert", firstTenantDocs[2]]]); + +// Re-enable the change stream for the second tenant and verify that the change stream cannot be +// resumed using the resume token of previous incarnation of the change stream. +replSetTest.setChangeStreamState(secondTenantConn, true); +assert.throwsWithCode( + () => secondTenantTestDb.stockPrice.watch([], {resumeAfter: secondTenantResumeTokens[0]}), + ErrorCodes.ChangeStreamHistoryLost); + +replSetTest.stopSet(); +}()); diff --git a/jstests/serverless/initial_sync_change_collection.js b/jstests/serverless/initial_sync_change_collection.js index 6f3b0128ef6..3d111307dd9 100644 --- a/jstests/serverless/initial_sync_change_collection.js +++ b/jstests/serverless/initial_sync_change_collection.js @@ -2,9 +2,8 @@ // and when the initial sync has completed the change collection and oplog entries are exactly same // in the new secondary. // @tags: [ -// featureFlagServerlessChangeStreams, -// featureFlagMongoStore, -// requires_fcv_61, +// requires_fcv_62, +// __TEMPORARILY_DISABLED__ // ] // (function() { @@ -15,10 +14,16 @@ load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeC const replSetTest = new ReplSetTest({nodes: 1}); -// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and -// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. -replSetTest.startSet( - {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})}); +// TODO SERVER-67267 Add 'serverless' flag. +// TODO SERVER-69115 Add 'featureFlagRequireTenantID' flag and remove '__TEMPORARILY_DISABLED__' +// tag and replace 'ReplSetTest' with 'ChangeStreamMultitenantReplicaSetTest'. +replSetTest.startSet({ + setParameter: { + featureFlagServerlessChangeStreams: true, + multitenancySupport: true, + featureFlagMongoStore: true + } +}); replSetTest.initiate(); @@ -44,7 +49,9 @@ const secondary = replSetTest.add({ setParameter: { // Hang after the data cloning phase is completed. "failpoint.initialSyncHangAfterDataCloning": tojson({mode: "alwaysOn"}), - "failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"}) + featureFlagServerlessChangeStreams: true, + multitenancySupport: true, + featureFlagMongoStore: true } }); diff --git a/jstests/serverless/libs/change_collection_util.js b/jstests/serverless/libs/change_collection_util.js index 4026ea84f81..f9b8a9c6846 100644 --- a/jstests/serverless/libs/change_collection_util.js +++ b/jstests/serverless/libs/change_collection_util.js @@ -1,29 +1,44 @@ // Contains functions for testing the change collections. -// Verifies that the oplog and change collection entries are the same for the specified start and -// end duration of the oplog timestamp. -function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplogTimestamp) { +// Verifies that the oplog and change collection entries are the same for the provided tenant +// 'tenantId' for the specified timestamp window:- (startOplogTimestamp, endOplogTimestamp]. +function verifyChangeCollectionEntries( + connection, startOplogTimestamp, endOplogTimestamp, tenantId) { + // Fetch the oplog documents for the provided tenant for the specified timestamp window. Note + // that the startOplogTimestamp is expected to be just before the first write, while the + // endOplogTimestamp is expected to be the timestamp of the final write in the test. const oplogColl = connection.getDB("local").oplog.rs; - const changeColl = connection.getDB("config").system.change_collection; + const oplogEntries = oplogColl + .find({ + $and: [ + {ts: {$gt: startOplogTimestamp}}, + {ts: {$lte: endOplogTimestamp}}, + {tid: tenantId} + ] + }) + .toArray(); - // Fetch all oplog and change collection entries for the duration: [startOplogTimestamp, - // endOplogTimestamp]. - const oplogEntries = - oplogColl.find({$and: [{ts: {$gte: startOplogTimestamp}}, {ts: {$lte: endOplogTimestamp}}]}) - .toArray(); + // Fetch all documents from the tenant's change collection for the specified timestamp window. + const changeColl = + ChangeStreamMultitenantReplicaSetTest.getTenantConnection(connection.host, tenantId) + .getDB("config") + .system.change_collection; const changeCollectionEntries = changeColl - .find({$and: [{_id: {$gte: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]}) + .find({$and: [{_id: {$gt: startOplogTimestamp}}, {_id: {$lte: endOplogTimestamp}}]}) .toArray(); - assert.eq( - oplogEntries.length, - changeCollectionEntries.length, - "Number of entries in the oplog and the change collection is not the same. Oplog has total " + - oplogEntries.length + " entries , change collection has total " + - changeCollectionEntries.length + " entries" + - "change collection entries " + tojson(changeCollectionEntries)); + // Verify that the number of documents returned by the oplog and the tenant's change collection + // are exactly the same. + assert.eq(oplogEntries.length, + changeCollectionEntries.length, + "Number of entries in the oplog and the change collection with tenantId: " + + tenantId + " is not the same. Oplog has total " + oplogEntries.length + + " entries , change collection has total " + changeCollectionEntries.length + + " entries, change collection entries " + tojson(changeCollectionEntries)); + // Verify that the documents in the change collection are exactly the same as the oplog for a + // particular tenant. for (let idx = 0; idx < oplogEntries.length; idx++) { const oplogEntry = oplogEntries[idx]; const changeCollectionEntry = changeCollectionEntries[idx]; @@ -32,16 +47,82 @@ function verifyChangeCollectionEntries(connection, startOplogTimestamp, endOplog assert(changeCollectionEntry.hasOwnProperty("_id")); assert.eq(timestampCmp(changeCollectionEntry._id, oplogEntry.ts), 0, - "Change collection '_id' field: " + tojson(changeCollectionEntry._id) + + "Change collection with tenantId: " + tenantId + + " '_id' field: " + tojson(changeCollectionEntry._id) + " is not same as the oplog 'ts' field: " + tojson(oplogEntry.ts)); delete changeCollectionEntry["_id"]; // Verify that the oplog and change collecton entry (after removing the '_id') field are // the same. - assert.eq( - oplogEntry, - changeCollectionEntry, - "Oplog and change collection entries are not same. Oplog entry: " + tojson(oplogEntry) + - ", change collection entry: " + tojson(changeCollectionEntry)); + assert.eq(oplogEntry, + changeCollectionEntry, + "Oplog and change collection with tenantId: " + tenantId + + " entries are not same. Oplog entry: " + tojson(oplogEntry) + + ", change collection entry: " + tojson(changeCollectionEntry)); + } +} + +// A class that sets up the multitenant environment to enable change collections on the replica set. +// This class also provides helpers that are commonly used when working with change collections. +class ChangeStreamMultitenantReplicaSetTest extends ReplSetTest { + constructor(config) { + // Instantiate the 'ReplSetTest'. + super(config); + + // Start and initialize the replica set. + // TODO SERVER-67267 Add 'serverless' flag. + const setParameter = Object.assign({}, config.setParameter || {}, { + featureFlagServerlessChangeStreams: true, + multitenancySupport: true, + featureFlagMongoStore: true, + featureFlagRequireTenantID: true + }); + this.startSet({setParameter: setParameter}); + this.initiate(); + + // Create a root user within the multitenant environment to enable passing '$tenant' to + // commands. + assert.commandWorked(this.getPrimary().getDB("admin").runCommand( + {createUser: "root", pwd: "pwd", roles: ["root"]})); + } + + // Returns a connection to the 'hostAddr' with 'tenantId' stamped to it for the created user. + static getTenantConnection(hostAddr, tenantId, createUser = { + user: ObjectId().str, + roles: [{role: 'readWriteAnyDatabase', db: 'admin'}] + }) { + const tokenConn = new Mongo(hostAddr); + + // Login to the root user with 'ActionType::useTenant' such that the '$tenant' can be + // used. + assert(tokenConn.getDB("admin").auth("root", "pwd")); + + // Create the user with the provided attributes. + assert.commandWorked(tokenConn.getDB("$external").runCommand({ + createUser: createUser.user, + '$tenant': tenantId, + roles: createUser.roles + })); + + // Set the provided tenant id into the security token for the user. + tokenConn._setSecurityToken( + _createSecurityToken({user: createUser.user, db: '$external', tenant: tenantId})); + + // Logout the root user to avoid multiple authentication. + tokenConn.getDB("admin").logout(); + + return tokenConn; + } + + // Sets the change stream state for the provided tenant connection. + setChangeStreamState(tenantConn, enabled) { + assert.commandWorked( + tenantConn.getDB("admin").runCommand({setChangeStreamState: 1, enabled: enabled})); + } + + // Returns the change stream state for the provided tenant connection. + getChangeStreamState(tenantConn) { + return assert.commandWorked(tenantConn.getDB("admin").runCommand({getChangeStreamState: 1})) + .enabled; } } diff --git a/jstests/serverless/sharded_change_stream_state_commands.js b/jstests/serverless/sharded_change_stream_state_commands.js index 541ce64bf5a..83102fe0e8e 100644 --- a/jstests/serverless/sharded_change_stream_state_commands.js +++ b/jstests/serverless/sharded_change_stream_state_commands.js @@ -9,11 +9,7 @@ const shardingTest = new ShardingTest({ shards: 2, - other: { - configOptions: { - setParameter: {"failpoint.forceEnableChangeCollectionsMode": tojson({mode: "alwaysOn"})} - } - } + other: {configOptions: {setParameter: {featureFlagServerlessChangeStreams: true}}} }); // TODO SERVER-68341 Implement tests for mongoQ and ensure that the change collection is not enabled diff --git a/jstests/serverless/write_to_change_collection_in_startup_recovery.js b/jstests/serverless/write_to_change_collection_in_startup_recovery.js index b0950abd057..bbf7cd0a259 100644 --- a/jstests/serverless/write_to_change_collection_in_startup_recovery.js +++ b/jstests/serverless/write_to_change_collection_in_startup_recovery.js @@ -1,9 +1,8 @@ // Tests that replaying the oplog entries during the startup recovery also writes to the change // collection. // @tags: [ -// featureFlagServerlessChangeStreams, -// multiversion_incompatible, -// featureFlagMongoStore, +// requires_fcv_62, +// __TEMPORARILY_DISABLED__ // ] (function() { @@ -14,10 +13,16 @@ load("jstests/serverless/libs/change_collection_util.js"); // For verifyChangeC const replSetTest = new ReplSetTest({nodes: 1}); -// TODO SERVER-67267 add 'featureFlagServerlessChangeStreams', 'multitenancySupport' and -// 'serverless' flags and remove 'failpoint.forceEnableChangeCollectionsMode'. -replSetTest.startSet( - {setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"})}); +// TODO SERVER-67267 Add 'serverless' flag. +// TODO SERVER-69115 Add 'featureFlagRequireTenantID' flag and remove '__TEMPORARILY_DISABLED__' +// tag and replace 'ReplSetTest' with 'ChangeStreamMultitenantReplicaSetTest'. +replSetTest.startSet({ + setParameter: { + featureFlagServerlessChangeStreams: true, + multitenancySupport: true, + featureFlagMongoStore: true + } +}); replSetTest.initiate(); @@ -84,7 +89,11 @@ MongoRunner.stopMongod(standalone, null, {noCleanData: true, skipValidation: tru // Start the replica set primary with the same db path. replSetTest.start(primary, { noCleanData: true, - setParameter: "failpoint.forceEnableChangeCollectionsMode=" + tojson({mode: "alwaysOn"}) + setParameter: { + featureFlagServerlessChangeStreams: true, + multitenancySupport: true, + featureFlagMongoStore: true + } }); primary = replSetTest.getPrimary(); @@ -105,6 +114,7 @@ assert(endTimestamp !== undefined); // Verify that the oplog and the change collection entries between the ['startTimestamp', // 'endTimestamp'] window are exactly same and in the same order. +// TODO SERVER-69115 Pass the tenant id to the 'verifyChangeCollectionEntries'. verifyChangeCollectionEntries(primary, startTimestamp, endTimestamp); replSetTest.stopSet(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 57c4feedb00..c0514e4dd44 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -511,7 +511,9 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/change_stream_state', + '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/dbdirectclient', '$BUILD_DIR/mongo/db/repl/primary_only_service', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', @@ -523,6 +525,21 @@ env.Library( ], ) +env.Library( + target='change_stream_serverless_helpers', + source=[ + 'change_stream_serverless_helpers.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/catalog/collection', + '$BUILD_DIR/mongo/db/catalog/collection_catalog', + '$BUILD_DIR/mongo/db/query/query_knobs', + '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/server_options', + '$BUILD_DIR/mongo/idl/feature_flag', + ], +) + env.Library( target='change_stream_change_collection_manager', source=[ @@ -532,9 +549,10 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/catalog_helpers', '$BUILD_DIR/mongo/db/catalog/clustered_collection_options', '$BUILD_DIR/mongo/db/catalog/collection_crud', - '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/dbhelpers', + '$BUILD_DIR/mongo/db/server_feature_flags', '$BUILD_DIR/mongo/db/service_context', ], ) @@ -546,6 +564,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/change_streams_cluster_parameter', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/util/periodic_runner', @@ -2515,6 +2534,7 @@ if wiredtiger: '$BUILD_DIR/mongo/db/catalog/local_oplog_info', '$BUILD_DIR/mongo/db/change_collection_expired_change_remover', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/change_streams_cluster_parameter', '$BUILD_DIR/mongo/db/commands/create_command', '$BUILD_DIR/mongo/db/mongohasher', diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp index 80ff680df43..5e9089fce0c 100644 --- a/src/mongo/db/catalog/drop_collection.cpp +++ b/src/mongo/db/catalog/drop_collection.cpp @@ -233,7 +233,7 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx, if (dropIfUUIDNotMatching && collectionUUID == *dropIfUUIDNotMatching) { return Status::OK(); } - const NamespaceStringOrUUID dbAndUUID{coll->ns().db().toString(), coll->uuid()}; + const NamespaceStringOrUUID dbAndUUID{coll->ns().dbName(), coll->uuid()}; const int numIndexes = coll->getIndexCatalog()->numIndexesTotal(opCtx); while (true) { @@ -254,7 +254,7 @@ Status _abortIndexBuildsAndDrop(OperationContext* opCtx, << collectionUUID << ") is being dropped"); // Take an exclusive lock to finish the collection drop. - optionalAutoDb.emplace(opCtx, startingNss.db(), MODE_IX); + optionalAutoDb.emplace(opCtx, startingNss.dbName(), MODE_IX); collLock.emplace(opCtx, dbAndUUID, MODE_X); // Abandon the snapshot as the index catalog will compare the in-memory state to the diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp index e3ee3c411f2..8b8777e0d23 100644 --- a/src/mongo/db/change_collection_expired_change_remover_test.cpp +++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/catalog/catalog_test_fixture.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/exec/document_value/document_value_test_util.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/plan_executor.h" @@ -54,7 +55,8 @@ namespace mongo { class ChangeCollectionExpiredChangeRemoverTest : public CatalogTestFixture { protected: ChangeCollectionExpiredChangeRemoverTest() - : CatalogTestFixture(Options{}.useMockClock(true)), _tenantId(OID::gen()) { + : CatalogTestFixture(Options{}.useMockClock(true)), + _tenantId(change_stream_serverless_helpers::getTenantIdForTesting()) { ChangeStreamChangeCollectionManager::create(getServiceContext()); } @@ -67,7 +69,7 @@ protected: } void insertDocumentToChangeCollection(OperationContext* opCtx, - boost::optional tenantId, + const TenantId& tenantId, const BSONObj& obj) { const auto wallTime = now(); Timestamp timestamp{wallTime}; @@ -78,6 +80,7 @@ protected: oplogEntry.setNss(NamespaceString::makeChangeCollectionNSS(tenantId)); oplogEntry.setObject(obj); oplogEntry.setWallClockTime(wallTime); + auto oplogEntryBson = oplogEntry.toBSON(); RecordData recordData{oplogEntryBson.objdata(), oplogEntryBson.objsize()}; @@ -112,8 +115,7 @@ protected: return entries; } - void dropAndRecreateChangeCollection(OperationContext* opCtx, - boost::optional tenantId) { + void dropAndRecreateChangeCollection(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.dropChangeCollection(opCtx, tenantId); changeCollectionManager.createChangeCollection(opCtx, tenantId); @@ -136,11 +138,13 @@ protected: opCtx, &*changeCollection, maxRecordIdBound); } - const boost::optional _tenantId; + const TenantId _tenantId; + boost::optional _changeCollectionManager; + RAIIServerParameterControllerForTest featureFlagController{"featureFlagServerlessChangeStreams", true}; - - boost::optional _changeCollectionManager; + RAIIServerParameterControllerForTest queryKnobController{ + "internalChangeStreamUseTenantIdForTesting", true}; }; // Tests that the last expired focument retrieved is the expected one. diff --git a/src/mongo/db/change_collection_expired_documents_remover.cpp b/src/mongo/db/change_collection_expired_documents_remover.cpp index 80a816945be..d84fb21b672 100644 --- a/src/mongo/db/change_collection_expired_documents_remover.cpp +++ b/src/mongo/db/change_collection_expired_documents_remover.cpp @@ -31,6 +31,7 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/change_streams_cluster_parameter_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/replication_coordinator.h" @@ -56,25 +57,31 @@ MONGO_FAIL_POINT_DEFINE(injectCurrentWallTimeForRemovingExpiredDocuments); namespace { -// TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is -// available. -std::vector> getAllTenants() { - return {boost::none}; +change_stream_serverless_helpers::TenantSet getConfigDbTenants(OperationContext* opCtx) { + auto tenantIds = change_stream_serverless_helpers::getConfigDbTenants(opCtx); + if (auto testTenantId = change_stream_serverless_helpers::resolveTenantId(boost::none)) { + tenantIds.insert(*testTenantId); + } + + return tenantIds; } -boost::optional getExpireAfterSeconds(boost::optional tid) { +boost::optional getExpireAfterSeconds(const TenantId& tenantId) { auto* clusterParameters = ServerParameterSet::getClusterParameterSet(); auto* changeStreamsParam = clusterParameters->get>( "changeStreams"); - return changeStreamsParam->getValue(tid).getExpireAfterSeconds(); + + // TODO SERVER-69511 Pass 'tenantId' instead of 'boost::none'. Move this function to + // 'change_stream_serverless_helpers'. + return changeStreamsParam->getValue(boost::none).getExpireAfterSeconds(); } void removeExpiredDocuments(Client* client) { // TODO SERVER-66717 Remove this logic from this method. Due to the delay in the feature flag // activation it was placed here. The remover job should ultimately be initialized at the mongod // startup when launched in serverless mode. - if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) { return; } @@ -98,7 +105,7 @@ void removeExpiredDocuments(Client* client) { long long maxStartWallTime = 0; auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx.get()); - for (const auto& tenantId : getAllTenants()) { + for (const auto& tenantId : getConfigDbTenants(opCtx.get())) { auto expiredAfterSeconds = getExpireAfterSeconds(tenantId); invariant(expiredAfterSeconds); @@ -169,13 +176,13 @@ void removeExpiredDocuments(Client* client) { /** * Defines a periodic background job to remove expired documents from change collections. - * The job will run every 'changeCollectionRemoverJobSleepSeconds', as defined in the cluster - * parameter. + * The job will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds', as defined in + * the cluster parameter. */ class ChangeCollectionExpiredDocumentsRemover { public: ChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext) { - const auto period = Seconds{gChangeCollectionRemoverJobSleepSeconds.load()}; + const auto period = Seconds{gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds.load()}; _jobAnchor = serviceContext->getPeriodicRunner()->makeJob( {"ChangeCollectionExpiredDocumentsRemover", removeExpiredDocuments, period}); _jobAnchor.start(); diff --git a/src/mongo/db/change_collection_expired_documents_remover.h b/src/mongo/db/change_collection_expired_documents_remover.h index bf9e36ae1f4..3ce5fc1ef94 100644 --- a/src/mongo/db/change_collection_expired_documents_remover.h +++ b/src/mongo/db/change_collection_expired_documents_remover.h @@ -35,7 +35,8 @@ namespace mongo { /** * Starts a periodic background job to remove expired documents from change collections. The job - * will run every 'changeCollectionRemoverJobSleepSeconds' as defined in the cluster parameter. + * will run every 'changeCollectionExpiredDocumentsRemoverJobSleepSeconds' as defined in the cluster + * parameter. */ void startChangeCollectionExpiredDocumentsRemover(ServiceContext* serviceContext); diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp index ca2735db03c..eee89bf9e07 100644 --- a/src/mongo/db/change_stream_change_collection_manager.cpp +++ b/src/mongo/db/change_stream_change_collection_manager.cpp @@ -38,7 +38,7 @@ #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog_raii.h" -#include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/multitenancy_gen.h" #include "mongo/db/namespace_string.h" @@ -46,28 +46,15 @@ #include "mongo/db/repl/apply_ops_command_info.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry_gen.h" +#include "mongo/db/server_feature_flags_gen.h" #include "mongo/db/server_options.h" #include "mongo/logv2/log.h" namespace mongo { - -// Sharded clusters do not support serverless mode at present, but this failpoint allows us to -// nonetheless test the behaviour of change collections in a sharded environment. -MONGO_FAIL_POINT_DEFINE(forceEnableChangeCollectionsMode); - namespace { const auto getChangeCollectionManager = ServiceContext::declareDecoration>(); -/** - * Returns the list of all tenant ids in the replica set. - * TODO SERVER-61822 Provide the real implementation after 'listDatabasesForAllTenants' is - * available. - */ -std::vector> getAllTenants() { - return {boost::none}; -} - /** * Creates a Document object from the supplied oplog entry, performs necessary modifications to it * and then returns it as a BSON object. @@ -88,13 +75,15 @@ class ChangeCollectionsWriter { public: explicit ChangeCollectionsWriter(const AutoGetChangeCollection::AccessMode& accessMode) : _accessMode{accessMode} {} + /** * Adds the insert statement for the provided tenant that will be written to the change * collection when the 'write()' method is called. */ - void add(const TenantId& tenantId, InsertStatement insertStatement) { - if (_shouldAddEntry(insertStatement)) { - _tenantStatementsMap[tenantId].push_back(std::move(insertStatement)); + void add(InsertStatement insertStatement) { + if (auto tenantId = _extractTenantId(insertStatement); + tenantId && _shouldAddEntry(insertStatement)) { + _tenantStatementsMap[*tenantId].push_back(std::move(insertStatement)); } } @@ -104,14 +93,12 @@ public: */ Status write(OperationContext* opCtx, OpDebug* opDebug) { for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) { - AutoGetChangeCollection tenantChangeCollection( - opCtx, _accessMode, boost::none /* tenantId */); + AutoGetChangeCollection tenantChangeCollection(opCtx, _accessMode, tenantId); // The change collection does not exist for a particular tenant because either the // change collection is not enabled or is in the process of enablement. Ignore this // insert for now. - // TODO: SERVER-65950 move this check before inserting to the map - // 'tenantToInsertStatements'. + // TODO SERVER-67170 Move this check before inserting to the map. if (!tenantChangeCollection) { continue; } @@ -127,9 +114,9 @@ public: false /* fromMigrate */); if (!status.isOK()) { return Status(status.code(), - str::stream() - << "Write to change collection: " << tenantChangeCollection->ns() - << "failed, reason: " << status.reason()); + str::stream() << "Write to change collection: " + << tenantChangeCollection->ns().toStringWithTenantId() + << "failed, reason: " << status.reason()); } } @@ -137,12 +124,31 @@ public: } private: + boost::optional _extractTenantId(const InsertStatement& insertStatement) { + // Parse the oplog entry to fetch the tenant id from 'tid' field. The oplog entry will not + // written to the change collection if 'tid' field is missing. + auto& oplogDoc = insertStatement.doc; + if (auto tidFieldElem = oplogDoc.getField(repl::OplogEntry::kTidFieldName)) { + return TenantId{Value(tidFieldElem).getOid()}; + } + + if (MONGO_unlikely(internalChangeStreamUseTenantIdForTesting.load())) { + return change_stream_serverless_helpers::getTenantIdForTesting(); + } + + return boost::none; + } + bool _shouldAddEntry(const InsertStatement& insertStatement) { auto& oplogDoc = insertStatement.doc; - // TODO SERVER-65950 retreive tenant from the oplog. // TODO SERVER-67170 avoid inspecting the oplog BSON object. if (auto nssFieldElem = oplogDoc[repl::OplogEntry::kNssFieldName]) { + // Avoid writing entry with empty 'ns' field, for eg. 'periodic noop' entry. + if (nssFieldElem.String().empty()) { + return false; + } + if (nssFieldElem.String() == "config.$cmd"_sd) { if (auto objectFieldElem = oplogDoc[repl::OplogEntry::kObjectFieldName]) { // The oplog entry might be a drop command on the change collection. Check if @@ -225,40 +231,8 @@ void ChangeStreamChangeCollectionManager::create(ServiceContext* service) { getChangeCollectionManager(service).emplace(service); } -bool ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive() { - // A change collection must not be enabled on the config server. - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - return false; - } - - // If the force fail point is enabled then declare the change collection mode as active. - if (MONGO_unlikely(forceEnableChangeCollectionsMode.shouldFail())) { - return true; - } - - // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag. - return serverGlobalParams.featureCompatibility.isVersionInitialized() && - feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( - serverGlobalParams.featureCompatibility); -} - -bool ChangeStreamChangeCollectionManager::hasChangeCollection( - OperationContext* opCtx, boost::optional tenantId) const { - auto catalog = CollectionCatalog::get(opCtx); - return static_cast(catalog->lookupCollectionByNamespace( - opCtx, NamespaceString::makeChangeCollectionNSS(tenantId))); -} - -bool ChangeStreamChangeCollectionManager::isChangeStreamEnabled( - OperationContext* opCtx, boost::optional tenantId) const { - // A change stream in the serverless is declared as enabled if both the change collection and - // the pre-images collection exist for the provided tenant. - return isChangeCollectionsModeActive() && hasChangeCollection(opCtx, tenantId) && - ChangeStreamPreImagesCollectionManager::hasPreImagesCollection(opCtx, tenantId); -} - -void ChangeStreamChangeCollectionManager::createChangeCollection( - OperationContext* opCtx, boost::optional tenantId) { +void ChangeStreamChangeCollectionManager::createChangeCollection(OperationContext* opCtx, + const TenantId& tenantId) { // Make the change collection clustered by '_id'. The '_id' field will have the same value as // the 'ts' field of the oplog. CollectionOptions changeCollectionOptions; @@ -268,13 +242,13 @@ void ChangeStreamChangeCollectionManager::createChangeCollection( const auto status = createCollection(opCtx, changeCollNss, changeCollectionOptions, BSONObj()); uassert(status.code(), - str::stream() << "Failed to create change collection: " << changeCollNss - << causedBy(status.reason()), + str::stream() << "Failed to create change collection: " + << changeCollNss.toStringWithTenantId() << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceExists); } void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* opCtx, - boost::optional tenantId) { + const TenantId& tenantId) { DropReply dropReply; const auto changeCollNss = NamespaceString::makeChangeCollectionNSS(tenantId); @@ -284,8 +258,8 @@ void ChangeStreamChangeCollectionManager::dropChangeCollection(OperationContext* &dropReply, DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); uassert(status.code(), - str::stream() << "Failed to drop change collection: " << changeCollNss - << causedBy(status.reason()), + str::stream() << "Failed to drop change collection: " + << changeCollNss.toStringWithTenantId() << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceNotFound); } @@ -310,9 +284,7 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( // tenant. auto changeCollDoc = createChangeCollectionEntryFromOplog(record.data.toBson()); - // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id. changeCollectionsWriter.add( - TenantId::kSystemTenantId, InsertStatement{std::move(changeCollDoc), ts, repl::OpTime::kUninitializedTerm}); } @@ -352,11 +324,8 @@ Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection( auto changeCollDoc = createChangeCollectionEntryFromOplog(oplogDoc); - // TODO SERVER-65950 replace 'TenantId::kSystemTenantId' with the tenant id. - changeCollectionsWriter.add(TenantId::kSystemTenantId, - InsertStatement{std::move(changeCollDoc), - oplogSlot.getTimestamp(), - oplogSlot.getTerm()}); + changeCollectionsWriter.add(InsertStatement{ + std::move(changeCollDoc), oplogSlot.getTimestamp(), oplogSlot.getTerm()}); } // Write documents to change collections. diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h index 49ff64d635b..82d9fc01590 100644 --- a/src/mongo/db/change_stream_change_collection_manager.h +++ b/src/mongo/db/change_stream_change_collection_manager.h @@ -112,17 +112,6 @@ public: */ static ChangeStreamChangeCollectionManager& get(OperationContext* opCtx); - /** - * Returns true if the server is configured such that change collections can be used to record - * oplog entries; ie, we are running in a Serverless context. Returns false otherwise. - */ - static bool isChangeCollectionsModeActive(); - - /** - * Returns true if the change collection is present for the specified tenant, false otherwise. - */ - bool hasChangeCollection(OperationContext* opCtx, boost::optional tenantId) const; - /** * Returns true if the change stream is enabled for the provided tenant, false otherwise. */ @@ -130,17 +119,13 @@ public: /** * Creates a change collection for the specified tenant, if it doesn't exist. - * - * TODO: SERVER-65950 make tenantId field mandatory. */ - void createChangeCollection(OperationContext* opCtx, boost::optional tenantId); + void createChangeCollection(OperationContext* opCtx, const TenantId& tenantId); /** * Deletes the change collection for the specified tenant, if it already exist. - * - * TODO: SERVER-65950 make tenantId field mandatory. */ - void dropChangeCollection(OperationContext* opCtx, boost::optional tenantId); + void dropChangeCollection(OperationContext* opCtx, const TenantId& tenantId); /** * Inserts documents to change collections. The parameter 'oplogRecords' is a vector of oplog @@ -152,8 +137,6 @@ public: * * Failure in insertion to any change collection will result in a fatal exception and will bring * down the node. - * - * TODO: SERVER-65950 make tenantId field mandatory. */ void insertDocumentsToChangeCollection(OperationContext* opCtx, const std::vector& oplogRecords, diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp index 3b82e7a7c46..25a12d7636a 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp +++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp @@ -35,7 +35,9 @@ #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_options_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/concurrency/lock_manager_defs.h" #include "mongo/db/concurrency/locker.h" @@ -47,7 +49,6 @@ #include "mongo/logv2/log.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/idle_thread_block.h" -#include "mongo/util/fail_point.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -118,7 +119,8 @@ void ChangeStreamPreImagesCollectionManager::createPreImagesCollection( opCtx, preImagesCollectionNamespace, preImagesCollectionOptions, BSONObj()); uassert(status.code(), str::stream() << "Failed to create the pre-images collection: " - << preImagesCollectionNamespace.coll() << causedBy(status.reason()), + << preImagesCollectionNamespace.toStringWithTenantId() + << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceExists); } @@ -133,7 +135,8 @@ void ChangeStreamPreImagesCollectionManager::dropPreImagesCollection( DropCollectionSystemCollectionMode::kAllowSystemCollectionDrops); uassert(status.code(), str::stream() << "Failed to drop the pre-images collection: " - << preImagesCollectionNamespace.coll() << causedBy(status.reason()), + << preImagesCollectionNamespace.toStringWithTenantId() + << causedBy(status.reason()), status.isOK() || status.code() == ErrorCodes::NamespaceNotFound); } @@ -148,11 +151,13 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op << preImage.getId().getApplyOpsIndex(), preImage.getId().getApplyOpsIndex() >= 0); + // TODO SERVER-66642 Consider using internal test-tenant id if applicable. + const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId); + // This lock acquisition can block on a stronger lock held by another operation modifying // the pre-images collection. There are no known cases where an operation holding an // exclusive lock on the pre-images collection also waits for oplog visibility. AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); - const auto preImagesCollectionNamespace = NamespaceString::makePreImageCollectionNSS(tenantId); AutoGetCollection preImagesCollectionRaii( opCtx, preImagesCollectionNamespace, LockMode::MODE_IX); auto& changeStreamPreImagesCollection = preImagesCollectionRaii.getCollection(); @@ -173,13 +178,6 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op uassertStatusOK(insertionStatus); } -bool ChangeStreamPreImagesCollectionManager::hasPreImagesCollection( - OperationContext* opCtx, boost::optional tenantId) { - auto catalog = CollectionCatalog::get(opCtx); - return static_cast(catalog->lookupCollectionByNamespace( - opCtx, NamespaceString::makePreImageCollectionNSS(tenantId))); -} - namespace { RecordId toRecordId(ChangeStreamPreImageId id) { return record_id_helpers::keyForElem( @@ -408,8 +406,9 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim // Acquire intent-exclusive lock on the pre-images collection. Early exit if the collection // doesn't exist. + // TODO SERVER-66642 Account for multitenancy. AutoGetCollection autoColl( - opCtx.get(), NamespaceString::kChangeStreamPreImagesNamespace, MODE_IX); + opCtx.get(), NamespaceString::makePreImageCollectionNSS(boost::none), MODE_IX); const auto& preImagesColl = autoColl.getCollection(); if (!preImagesColl) { return; @@ -436,11 +435,12 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim change_stream_pre_image_helpers::getPreImageExpirationTime( opCtx.get(), currentTimeForTimeBasedExpiration)); + // TODO SERVER-66642 Account for multitenancy. for (const auto& collectionRange : expiredPreImages) { writeConflictRetry( opCtx.get(), "ChangeStreamExpiredPreImagesRemover", - NamespaceString::kChangeStreamPreImagesNamespace.ns(), + NamespaceString::makePreImageCollectionNSS(boost::none).ns(), [&] { auto params = std::make_unique(); params->isMulti = true; diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.h b/src/mongo/db/change_stream_pre_images_collection_manager.h index dede0e38c96..75efb28c22d 100644 --- a/src/mongo/db/change_stream_pre_images_collection_manager.h +++ b/src/mongo/db/change_stream_pre_images_collection_manager.h @@ -88,13 +88,6 @@ public: boost::optional tenantId, const ChangeStreamPreImage& preImage); - /** - * Returns true if the pre-images collection exists, false otherwise. If 'tenantId' is provided - * then the pre-images collection associated with that tenant will be checked for existence, - * otherwise the default pre-images collection will be checked. - */ - static bool hasPreImagesCollection(OperationContext* opCtx, boost::optional tenantId); - /** * Scans the system pre-images collection and deletes the expired pre-images from it. */ diff --git a/src/mongo/db/change_stream_serverless_helpers.cpp b/src/mongo/db/change_stream_serverless_helpers.cpp new file mode 100644 index 00000000000..0577894e397 --- /dev/null +++ b/src/mongo/db/change_stream_serverless_helpers.cpp @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery + +#include "mongo/db/change_stream_serverless_helpers.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/server_feature_flags_gen.h" +#include "mongo/db/server_options.h" + +namespace mongo { +namespace change_stream_serverless_helpers { + +bool isChangeCollectionsModeActive() { + // A change collection must not be enabled on the config server. + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return false; + } + + // TODO SERVER-67267 guard with 'multitenancySupport' and 'isServerless' flag. + return serverGlobalParams.featureCompatibility.isVersionInitialized() && + feature_flags::gFeatureFlagServerlessChangeStreams.isEnabled( + serverGlobalParams.featureCompatibility); +} + +bool isChangeStreamEnabled(OperationContext* opCtx, const TenantId& tenantId) { + auto catalog = CollectionCatalog::get(opCtx); + + // A change stream in the serverless is declared as enabled if both the change collection and + // the pre-images collection exist for the provided tenant. + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + return isChangeCollectionsModeActive() && + static_cast(catalog->lookupCollectionByNamespace( + opCtx, NamespaceString::makeChangeCollectionNSS(tenantId))) && + static_cast(catalog->lookupCollectionByNamespace( + opCtx, NamespaceString::makePreImageCollectionNSS(boost::none))); +} + +const TenantId& getTenantIdForTesting() { + static const TenantId kTestTenantId( + OID("00000000" /* timestamp */ + "0000000000" /* process id */ + "000000" /* counter */)); + + return kTestTenantId; +} + +boost::optional resolveTenantId(boost::optional tenantId) { + if (tenantId) { + return tenantId; + } else if (MONGO_unlikely(internalChangeStreamUseTenantIdForTesting.load())) { + return getTenantIdForTesting(); + } + + return tenantId; +} + +TenantSet getConfigDbTenants(OperationContext* opCtx) { + TenantSet tenantIds; + + auto dbNames = CollectionCatalog::get(opCtx)->getAllDbNames(); + for (auto&& dbName : dbNames) { + if (dbName.db() == NamespaceString::kConfigDb && dbName.tenantId()) { + tenantIds.insert(*dbName.tenantId()); + } + } + + return tenantIds; +} + +} // namespace change_stream_serverless_helpers +} // namespace mongo diff --git a/src/mongo/db/change_stream_serverless_helpers.h b/src/mongo/db/change_stream_serverless_helpers.h new file mode 100644 index 00000000000..bdeb04f3ff7 --- /dev/null +++ b/src/mongo/db/change_stream_serverless_helpers.h @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include + +#include "mongo/db/operation_context.h" +#include "mongo/db/tenant_id.h" + +namespace mongo { +namespace change_stream_serverless_helpers { + +using TenantSet = stdx::unordered_set; + +/** + * Returns true if the server is configured such that change collections can be used to record + * oplog entries; ie, we are running in a Serverless context. Returns false otherwise. + */ +bool isChangeCollectionsModeActive(); + +/** + * Returns true if the change stream is enabled for the provided tenant, false otherwise. + */ +bool isChangeStreamEnabled(OperationContext* opCtx, const TenantId& tenantId); + +/** + * Returns an internal tenant id that will be used for testing purposes. This tenant id will not + * conflict with any other tenant id. + */ +const TenantId& getTenantIdForTesting(); + +/** + * If the provided 'tenantId' is missing and 'internalChangeStreamUseTenantIdForTesting' is true, + * returns a special 'TenantId' for testing purposes. Otherwise, returns the provided 'tenantId'. + */ +boost::optional resolveTenantId(boost::optional tenantId); + +/** + * Returns the list of the tenants associated with a 'config' database. + */ +TenantSet getConfigDbTenants(OperationContext* opCtx); + +} // namespace change_stream_serverless_helpers +} // namespace mongo diff --git a/src/mongo/db/change_streams_cluster_parameter.idl b/src/mongo/db/change_streams_cluster_parameter.idl index 466e1c0345a..899018d04df 100644 --- a/src/mongo/db/change_streams_cluster_parameter.idl +++ b/src/mongo/db/change_streams_cluster_parameter.idl @@ -58,11 +58,11 @@ server_parameters: cpp_varname: gChangeStreamsClusterParameter validator: callback: validateChangeStreamsClusterParameter - changeCollectionRemoverJobSleepSeconds: + changeCollectionExpiredDocumentsRemoverJobSleepSeconds: description: "Specifies the number of seconds for which the periodic change collection remover job will sleep between each cycle." set_at: [ startup ] cpp_vartype: AtomicWord - cpp_varname: "gChangeCollectionRemoverJobSleepSeconds" + cpp_varname: "gChangeCollectionExpiredDocumentsRemoverJobSleepSeconds" validator: gte: 1 default: 10 diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index c26e7dfc7c0..77573fa709c 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -354,6 +354,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/multi_index_block', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/command_can_run_here', '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/concurrency/exception_util', diff --git a/src/mongo/db/commands/change_stream_state_command.cpp b/src/mongo/db/commands/change_stream_state_command.cpp index de8d98083ea..44d186c03a1 100644 --- a/src/mongo/db/commands/change_stream_state_command.cpp +++ b/src/mongo/db/commands/change_stream_state_command.cpp @@ -30,8 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/change_stream_change_collection_manager.h" -#include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/change_stream_state_gen.h" #include "mongo/db/commands.h" #include "mongo/db/set_change_stream_state_coordinator.h" @@ -40,7 +39,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand namespace mongo { - namespace { /** @@ -67,6 +65,10 @@ public: " enabled: enable or disable the change stream"; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; @@ -74,18 +76,19 @@ public: void typedRun(OperationContext* opCtx) { uassert(ErrorCodes::CommandNotSupported, str::stream() << SetChangeStreamStateCommandRequest::kCommandName - << " is only supported in the serverless", - ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()); + << " command is only supported in serverless", + change_stream_serverless_helpers::isChangeCollectionsModeActive()); - // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be - // present. Remove 'getDollarTenant()' and fetch tenant from dbName(). - const std::string tenantId = request().getDollarTenant() - ? request().getDollarTenant()->toString() - : TenantId::kSystemTenantId.toString(); + const auto tenantId = + change_stream_serverless_helpers::resolveTenantId(request().getDbName().tenantId()); + uassert(ErrorCodes::BadValue, + str::stream() << SetChangeStreamStateCommandRequest::kCommandName + << " command must be provided with a tenant id", + tenantId); // Prepare the payload for the 'SetChangeStreamStateCoordinator'. SetChangeStreamStateCoordinatorId coordinatorId; - coordinatorId.setTenantId({TenantId{OID(tenantId)}}); + coordinatorId.setTenantId(tenantId); SetChangeStreamStateCoordinatorDocument coordinatorDoc{ coordinatorId, request().getChangeStreamStateParameters().toBSON()}; @@ -134,6 +137,10 @@ public: " {getChangeStreamState: 1}"; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; @@ -141,17 +148,20 @@ public: auto typedRun(OperationContext* opCtx) { uassert(ErrorCodes::CommandNotSupported, str::stream() << GetChangeStreamStateCommandRequest::kCommandName - << " is only supported in the serverless", - ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()); + << " command is only supported in serverless", + change_stream_serverless_helpers::isChangeCollectionsModeActive()); + + const auto tenantId = + change_stream_serverless_helpers::resolveTenantId(request().getDbName().tenantId()); + uassert(ErrorCodes::BadValue, + str::stream() << GetChangeStreamStateCommandRequest::kCommandName + << " command must be provided with a tenant id", + tenantId); - // TODO SERVER-65950 use provided '$tenant' only and add 'uassert that tenant must be - // present. - boost::optional tenantId = boost::none; // Set the change stream enablement state in the 'reply' object. - auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); GetChangeStreamStateCommandRequest::Reply reply{ - changeCollectionManager.isChangeStreamEnabled(opCtx, tenantId)}; + change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId)}; return reply; } diff --git a/src/mongo/db/commands/dbcommands.cpp b/src/mongo/db/commands/dbcommands.cpp index eb3c3567077..29c85819230 100644 --- a/src/mongo/db/commands/dbcommands.cpp +++ b/src/mongo/db/commands/dbcommands.cpp @@ -188,6 +188,10 @@ public: bool collectsResourceConsumptionMetrics() const final { return true; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBaseGen { public: using InvocationBaseGen::InvocationBaseGen; diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 4fe8a736ad7..b7bd221ec61 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -41,6 +41,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/curop.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" @@ -757,13 +758,20 @@ Status runAggregate(OperationContext* opCtx, nss = NamespaceString::kRsOplogNamespace; // In case of serverless the change stream will be opened on the change collection. - if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { - auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); + if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { + const auto tenantId = + change_stream_serverless_helpers::resolveTenantId(origNss.tenantId()); + + uassert(ErrorCodes::BadValue, + "Change streams cannot be used without tenant id", + tenantId); + uassert(ErrorCodes::ChangeStreamNotEnabled, "Change streams must be enabled before being used.", - changeCollectionManager.isChangeStreamEnabled(opCtx, origNss.tenantId())); + change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId)); + - nss = NamespaceString::makeChangeCollectionNSS(origNss.tenantId()); + nss = NamespaceString::makeChangeCollectionNSS(tenantId); } // Assert that a change stream on the config server is always opened on the oplog. @@ -785,7 +793,7 @@ Status runAggregate(OperationContext* opCtx, LOGV2_INFO(6689200, "Opening change stream on the namespace: {nss}", "Opening change stream", - "nss"_attr = nss.toString()); + "nss"_attr = nss.toStringWithTenantId()); } // Upgrade and wait for read concern if necessary. diff --git a/src/mongo/db/commands/set_cluster_parameter_command.cpp b/src/mongo/db/commands/set_cluster_parameter_command.cpp index 08ae1b2835e..8c49717baad 100644 --- a/src/mongo/db/commands/set_cluster_parameter_command.cpp +++ b/src/mongo/db/commands/set_cluster_parameter_command.cpp @@ -66,6 +66,10 @@ public: return "Set cluster parameter on replica set or node"; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBase { public: using InvocationBase::InvocationBase; diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index ce879e0ee2a..9240c572a54 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -87,6 +87,10 @@ public: return true; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBaseGen { public: using InvocationBaseGen::InvocationBaseGen; @@ -200,6 +204,10 @@ public: return true; } + bool allowedWithSecurityToken() const final { + return true; + } + class Invocation final : public InvocationBaseGen { public: using InvocationBaseGen::InvocationBaseGen; diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 8cbe5909154..f1a5d2a4871 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -349,7 +349,7 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) { } } - // TODO SERVER-65950 create 'SetChangeStreamStateCoordinatorService' only in the serverless. + // TODO SERVER-66717 create 'SetChangeStreamStateCoordinatorService' only in the serverless. services.push_back(std::make_unique(serviceContext)); for (auto& service : services) { @@ -790,6 +790,8 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { repl::ReplicationCoordinator::modeNone; if (!isStandalone) { startChangeStreamExpiredPreImagesRemover(serviceContext); + // TODO SERVER-66717 Start 'startChangeCollectionExpiredDocumentsRemover' only in the + // serverless. startChangeCollectionExpiredDocumentsRemover(serviceContext); } @@ -1561,7 +1563,7 @@ int mongod_main(int argc, char* argv[]) { ReadWriteConcernDefaults::create(service, readWriteConcernDefaultsCacheLookupMongoD); ChangeStreamOptionsManager::create(service); - // TODO SERVER-65950 create 'ChangeStreamChangeCollectionManager' only in the serverless. + // TODO SERVER-66717 Create 'ChangeStreamChangeCollectionManager' only in the serverless. ChangeStreamChangeCollectionManager::create(service); #if defined(_WIN32) diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 5cfd942a8f3..a3a28901993 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -111,8 +111,6 @@ const NamespaceString NamespaceString::kSystemReplSetNamespace(NamespaceString:: "system.replset"); const NamespaceString NamespaceString::kLastVoteNamespace(NamespaceString::kLocalDb, "replset.election"); -const NamespaceString NamespaceString::kChangeStreamPreImagesNamespace(NamespaceString::kConfigDb, - "system.preimages"); const NamespaceString NamespaceString::kIndexBuildEntryNamespace(NamespaceString::kConfigDb, "system.indexBuilds"); const NamespaceString NamespaceString::kRangeDeletionNamespace(NamespaceString::kConfigDb, @@ -338,8 +336,7 @@ NamespaceString NamespaceString::makeCollectionlessAggregateNSS(const DatabaseNa NamespaceString NamespaceString::makeChangeCollectionNSS( const boost::optional& tenantId) { - // TODO: SERVER-65950 create namespace for a particular tenant. - return NamespaceString{NamespaceString::kConfigDb, NamespaceString::kChangeCollectionName}; + return NamespaceString{tenantId, kConfigDb, kChangeCollectionName}; } NamespaceString NamespaceString::makeGlobalIndexNSS(const UUID& id) { @@ -350,8 +347,7 @@ NamespaceString NamespaceString::makeGlobalIndexNSS(const UUID& id) { NamespaceString NamespaceString::makePreImageCollectionNSS( const boost::optional& tenantId) { - return tenantId ? NamespaceString(tenantId, kConfigDb, "system.preimages") - : kChangeStreamPreImagesNamespace; + return NamespaceString{tenantId, kConfigDb, kPreImagesCollectionName}; } std::string NamespaceString::getSisterNS(StringData local) const { @@ -469,11 +465,11 @@ bool NamespaceString::isTimeseriesBucketsCollection() const { } bool NamespaceString::isChangeStreamPreImagesCollection() const { - return ns() == kChangeStreamPreImagesNamespace.ns(); + return _dbName.db() == kConfigDb && coll() == kPreImagesCollectionName; } bool NamespaceString::isChangeCollection() const { - return db() == kConfigDb && coll() == kChangeCollectionName; + return _dbName.db() == kConfigDb && coll() == kChangeCollectionName; } bool NamespaceString::isConfigImagesCollection() const { diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 79da7a0f8ae..da20bca25d2 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -82,7 +82,10 @@ public: // Name for the system.js collection static constexpr StringData kSystemDotJavascriptCollectionName = "system.js"_sd; - // Name for the change stream change collection. + // Name of the pre-images collection. + static constexpr StringData kPreImagesCollectionName = "system.preimages"_sd; + + // Name of the change stream change collection. static constexpr StringData kChangeCollectionName = "system.change_collection"_sd; // Names of privilege document collections @@ -171,9 +174,6 @@ public: // Namespace for storing the last replica set election vote. static const NamespaceString kLastVoteNamespace; - // Namespace for change stream pre-images collection. - static const NamespaceString kChangeStreamPreImagesNamespace; - // Namespace for index build entries. static const NamespaceString kIndexBuildEntryNamespace; diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp index 6e120317ca8..bb54d4bc07a 100644 --- a/src/mongo/db/op_observer/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp @@ -225,7 +225,7 @@ protected: reset(opCtx, NamespaceString::kRsOplogNamespace); reset(opCtx, NamespaceString::kSessionTransactionsTableNamespace); reset(opCtx, NamespaceString::kConfigImagesNamespace); - reset(opCtx, NamespaceString::kChangeStreamPreImagesNamespace); + reset(opCtx, NamespaceString::makePreImageCollectionNSS(boost::none)); } // Assert that the oplog has the expected number of entries, and return them @@ -288,7 +288,7 @@ protected: bool didWriteDeletedDocToPreImagesCollection(OperationContext* opCtx, const ChangeStreamPreImageId preImageId) { AutoGetCollection preImagesCollection( - opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IS); + opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), LockMode::MODE_IS); const auto preImage = Helpers::findOneForTesting( opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON()), false); return !preImage.isEmpty(); @@ -323,7 +323,7 @@ protected: const ChangeStreamPreImageId& preImageId, BSONObj* container) { AutoGetCollection preImagesCollection( - opCtx, NamespaceString::kChangeStreamPreImagesNamespace, MODE_IS); + opCtx, NamespaceString::makePreImageCollectionNSS(boost::none), LockMode::MODE_IS); *container = Helpers::findOneForTesting(opCtx, preImagesCollection.getCollection(), BSON("_id" << preImageId.toBSON())) diff --git a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp index 583e3198a42..78d7e9f19cf 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_add_pre_image.cpp @@ -120,9 +120,11 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamAddPreImage::doGetNext() boost::optional DocumentSourceChangeStreamAddPreImage::lookupPreImage( boost::intrusive_ptr pExpCtx, const Document& preImageId) { // Look up the pre-image document on the local node by id. + // TODO SERVER-66642 Consider using internal test-tenant id if applicable. + const auto tenantId = pExpCtx->ns.tenantId(); auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocumentLocally( pExpCtx, - NamespaceString::kChangeStreamPreImagesNamespace, + NamespaceString::makePreImageCollectionNSS(tenantId), Document{{ChangeStreamPreImage::kIdFieldName, preImageId}}); // Return boost::none to signify that we failed to find the pre-image. diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 9444a694c81..68d03d0c96a 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -1544,7 +1544,7 @@ std::unique_ptr attachCursorToPipeline( return (ns.isLocal() || ns.isConfigDotCacheDotChunks() || ns.isReshardingLocalOplogBufferCollection() || ns == NamespaceString::kConfigImagesNamespace || - ns == NamespaceString::kChangeStreamPreImagesNamespace); + ns.isChangeStreamPreImagesCollection()); }; if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed || diff --git a/src/mongo/db/query/query_knobs.idl b/src/mongo/db/query/query_knobs.idl index efbe81f8501..e36a60c4df3 100644 --- a/src/mongo/db/query/query_knobs.idl +++ b/src/mongo/db/query/query_knobs.idl @@ -906,6 +906,15 @@ server_parameters: default: expr: false + # TODO SERVER-68341 Remove this query knob after tenancy is supported in the sharded cluster. + internalChangeStreamUseTenantIdForTesting: + description: "If true, then change streams will operate upon an internal tenant id for testing + purposes if the actual tenant is not provided." + set_at: [ startup ] + cpp_varname: "internalChangeStreamUseTenantIdForTesting" + cpp_vartype: AtomicWord + default: false + # Note for adding additional query knobs: # # When adding a new query knob, you should consider whether or not you need to add an 'on_update' diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 7185e7e77d6..d91977bac6c 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -75,6 +75,7 @@ env.Library( '$BUILD_DIR/mongo/db/catalog/multi_index_block', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/commands/feature_compatibility_parsers', '$BUILD_DIR/mongo/db/commands/txn_cmd_request', '$BUILD_DIR/mongo/db/concurrency/exception_util', @@ -634,6 +635,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/catalog/collection_crud', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/commands/mongod_fsync', '$BUILD_DIR/mongo/db/concurrency/exception_util', '$BUILD_DIR/mongo/db/concurrency/lock_manager', @@ -1502,6 +1504,7 @@ env.Library( '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/auth/auth', '$BUILD_DIR/mongo/db/change_stream_pre_images_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/cloner', '$BUILD_DIR/mongo/db/concurrency/lock_manager', '$BUILD_DIR/mongo/db/curop', diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index b17e5f108ec..5c4ec5d5dab 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -57,6 +57,7 @@ #include "mongo/db/catalog/rename_collection.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/client.h" #include "mongo/db/coll_mod_gen.h" #include "mongo/db/commands.h" @@ -390,7 +391,7 @@ void _logOpsInner(OperationContext* opCtx, } // Insert the oplog records to the respective tenants change collections. - if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection( opCtx, *records, timestamps); } diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 032e9109f14..51eebc457bc 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -36,6 +36,7 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/db_raii.h" @@ -150,7 +151,7 @@ Status _insertDocumentsToOplogAndChangeCollections( // Write the corresponding oplog entries to tenants respective change // collections in the serverless. - if (ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) { auto status = ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection( opCtx, @@ -416,8 +417,7 @@ void scheduleWritesToOplogAndChangeCollection(OperationContext* opCtx, bool skipWritesToOplog) { // Skip performing any writes during the startup recovery when running in the non-serverless // environment. - if (skipWritesToOplog && - !ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (skipWritesToOplog && !change_stream_serverless_helpers::isChangeCollectionsModeActive()) { return; } diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index d644d5ad734..842cca1bc65 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -481,10 +481,10 @@ TEST_F(OplogApplierImplTest, applyOplogEntryToRecordChangeStreamPreImages) { WriteUnitOfWork wuow{_opCtx.get()}; ChangeStreamPreImageId preImageId{*(options.uuid), op.getOpTime().getTimestamp(), 0}; BSONObj preImageDocumentKey = BSON("_id" << preImageId.toBSON()); - auto preImageLoadResult = - getStorageInterface()->deleteById(_opCtx.get(), - NamespaceString::kChangeStreamPreImagesNamespace, - preImageDocumentKey.firstElement()); + auto preImageLoadResult = getStorageInterface()->deleteById( + _opCtx.get(), + NamespaceString::makePreImageCollectionNSS(boost::none), + preImageDocumentKey.firstElement()); repl::getNextOpTime(_opCtx.get()); wuow.commit(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 8da1bcc1f5c..71bcbe60296 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -45,6 +45,7 @@ #include "mongo/db/catalog/local_oplog_info.h" #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/client.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/commands/rwc_defaults_commands_gen.h" @@ -541,7 +542,7 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC }); // Create the pre-images collection if it doesn't exist yet in the non-serverless environment. - if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) { ChangeStreamPreImagesCollectionManager::createPreImagesCollection( opCtx, boost::none /* tenantId */); } diff --git a/src/mongo/db/set_change_stream_state_coordinator.cpp b/src/mongo/db/set_change_stream_state_coordinator.cpp index a0ee72fac51..9191769214c 100644 --- a/src/mongo/db/set_change_stream_state_coordinator.cpp +++ b/src/mongo/db/set_change_stream_state_coordinator.cpp @@ -34,6 +34,8 @@ #include "mongo/db/change_stream_change_collection_manager.h" #include "mongo/db/change_stream_pre_images_collection_manager.h" #include "mongo/db/change_stream_state_gen.h" +#include "mongo/db/concurrency/exception_util.h" +#include "mongo/db/op_observer/op_observer.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/logv2/log.h" @@ -77,15 +79,15 @@ public: const auto setChangeStreamParameter = ChangeStreamStateParameters::parse( IDLParserContext("ChangeStreamStateParameters"), _stateDoc.getCommand()); - invariant(_stateDoc.getId().getTenantId()); - - // TODO SERVER-65950 replace 'tenantId' with the provided tenant id. - auto tenantId = boost::none; + // A tenant's change collection and the pre-images collection are always associated with a + // tenant id. + const auto tenantId = _stateDoc.getId().getTenantId(); + tassert(6664100, "Tenant id is missing", tenantId); if (setChangeStreamParameter.getEnabled()) { - _enableChangeStream(opCtx, tenantId); + _enableChangeStream(opCtx, *tenantId); } else { - _disableChangeStream(opCtx, tenantId); + _disableChangeStream(opCtx, *tenantId); } } @@ -94,11 +96,38 @@ private: * Enables the change stream in the serverless by creating the change collection and the * pre-image collection. */ - void _enableChangeStream(OperationContext* opCtx, boost::optional tenantId) { + void _enableChangeStream(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.createChangeCollection(opCtx, tenantId); - ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, tenantId); + // TODO SERVER-66643 Remove this code. A change collection must have atleast one entry for + // the change stream to advance. As such artifically create any oplog entry such that it + // will be captured by the change collection. With SERVER-66643, the pre-images collection + // 'create' oplog entry will be auto captured by the change collection and hence writing + // this entry will not be required. Also remove the necessary header and linked library + // after removing this code. + [&]() { + writeConflictRetry( + opCtx, "writeNoop", NamespaceString::kRsOplogNamespace.ns(), [&] { + Lock::GlobalLock lock(opCtx, MODE_IX); + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( + opCtx, + NamespaceString::makeChangeCollectionNSS(tenantId), + boost::none, + BSON("msg" + << "enable change stream"), + boost::none, + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + }); + }(); + + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + ChangeStreamPreImagesCollectionManager::createPreImagesCollection(opCtx, boost::none); // Wait until the create requests are majority committed. waitForMajority(opCtx); @@ -108,11 +137,12 @@ private: * Disables the change stream in the serverless by dropping the change collection and the * pre-image collection. */ - void _disableChangeStream(OperationContext* opCtx, boost::optional tenantId) { + void _disableChangeStream(OperationContext* opCtx, const TenantId& tenantId) { auto& changeCollectionManager = ChangeStreamChangeCollectionManager::get(opCtx); changeCollectionManager.dropChangeCollection(opCtx, tenantId); - ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, tenantId); + // TODO SERVER-66643 Pass 'tenantId' to the pre-images collection. + ChangeStreamPreImagesCollectionManager::dropPreImagesCollection(opCtx, boost::none); // Wait until the drop requests are majority committed. waitForMajority(opCtx); diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript index 01242257d02..a4378a89caf 100644 --- a/src/mongo/db/stats/SConscript +++ b/src/mongo/db/stats/SConscript @@ -90,6 +90,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/change_stream_change_collection_manager', + '$BUILD_DIR/mongo/db/change_stream_serverless_helpers', '$BUILD_DIR/mongo/db/commands/server_status_core', '$BUILD_DIR/mongo/db/server_base', ], diff --git a/src/mongo/db/stats/change_collection_server_status.cpp b/src/mongo/db/stats/change_collection_server_status.cpp index ee424a4ae43..f7d7f75a75c 100644 --- a/src/mongo/db/stats/change_collection_server_status.cpp +++ b/src/mongo/db/stats/change_collection_server_status.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/change_stream_change_collection_manager.h" +#include "mongo/db/change_stream_serverless_helpers.h" #include "mongo/db/commands/server_status.h" namespace mongo { @@ -49,7 +50,7 @@ public: const BSONElement& configElement, BSONObjBuilder* result) const override { // Append the section only when running in serverless. - if (!ChangeStreamChangeCollectionManager::isChangeCollectionsModeActive()) { + if (!change_stream_serverless_helpers::isChangeCollectionsModeActive()) { return; } -- cgit v1.2.1