summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml4
-rw-r--r--jstests/auth/change_stream_change_collection_role_auth.js3
-rw-r--r--jstests/serverless/change_stream_state_commands.js11
-rw-r--r--jstests/serverless/change_streams/multitenant_read_from_change_collection.js9
-rw-r--r--jstests/serverless/change_streams_cluster_parameter.js2
-rw-r--r--src/mongo/db/bulk_write_shard_test.cpp2
-rw-r--r--src/mongo/db/catalog/README.md8
-rw-r--r--src/mongo/db/catalog/create_collection.cpp7
-rw-r--r--src/mongo/db/catalog/drop_collection.cpp9
-rw-r--r--src/mongo/db/catalog/drop_database.cpp30
-rw-r--r--src/mongo/db/catalog_raii.cpp61
-rw-r--r--src/mongo/db/catalog_raii.h41
-rw-r--r--src/mongo/db/change_collection_expired_change_remover_test.cpp6
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.cpp172
-rw-r--r--src/mongo/db/change_stream_change_collection_manager.h50
-rw-r--r--src/mongo/db/change_stream_pre_images_collection_manager.cpp5
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp15
-rw-r--r--src/mongo/db/concurrency/d_concurrency.cpp58
-rw-r--r--src/mongo/db/concurrency/d_concurrency.h46
-rw-r--r--src/mongo/db/concurrency/d_concurrency_test.cpp214
-rw-r--r--src/mongo/db/concurrency/lock_manager_defs.h14
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp49
-rw-r--r--src/mongo/db/concurrency/lock_state.h14
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp33
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp46
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h10
-rw-r--r--src/mongo/db/shard_role_test.cpp8
27 files changed, 751 insertions, 176 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
index 73adfbc19b4..a2caa7e4eaa 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_multitenant_sharded_collections_passthrough.yml
@@ -4,7 +4,9 @@ selector:
roots:
- jstests/change_streams/**/*.js
exclude_files:
- # TODO SERVER-69959: Implement a majority-committed insert listener.
+ # TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless.
+ - jstests/change_streams/**/*.js
+ # TODO SERVER-74555: Implement a majority-committed insert listener.
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# TODO SERVER-68341: Implement enable/disable command for mongoQ in the serverless.
- jstests/change_streams/projection_fakes_internal_event.js
diff --git a/jstests/auth/change_stream_change_collection_role_auth.js b/jstests/auth/change_stream_change_collection_role_auth.js
index 4d174543d66..4ed7464cdc8 100644
--- a/jstests/auth/change_stream_change_collection_role_auth.js
+++ b/jstests/auth/change_stream_change_collection_role_auth.js
@@ -6,7 +6,8 @@
* assumes_read_preference_unchanged,
* requires_replication,
* requires_fcv_62,
- * __TEMPORARILY_DISABLED__
+ * # TODO SERVER-74811: Re-enable this test.
+ * __TEMPORARILY_DISABLED__,
* ]
*/
(function() {
diff --git a/jstests/serverless/change_stream_state_commands.js b/jstests/serverless/change_stream_state_commands.js
index c7196e7a104..f6fdcb6eb61 100644
--- a/jstests/serverless/change_stream_state_commands.js
+++ b/jstests/serverless/change_stream_state_commands.js
@@ -9,6 +9,10 @@
load("jstests/libs/fail_point_util.js"); // For configureFailPoint.
load('jstests/libs/parallel_shell_helpers.js'); // For funWithArgs.
+// Disable implicit sessions since dropping "config" database for a tenant must be done not in a
+// session.
+TestData.disableImplicitSessions = true;
+
const replSetTest =
new ReplSetTest({nodes: 2, name: "change-stream-state-commands", serverless: true});
@@ -104,6 +108,12 @@ const secondOrgTenantId = ObjectId();
setChangeStreamState(firstOrgTenantId, false);
setChangeStreamState(firstOrgTenantId, false);
assertChangeStreamState(firstOrgTenantId, false);
+
+ // Verify that dropping "config" database works and effectively disables change streams.
+ setChangeStreamState(firstOrgTenantId, true);
+ assert.commandWorked(replSetTest.getPrimary().getDB("config").runCommand(
+ {dropDatabase: 1, $tenant: firstOrgTenantId}));
+ assertChangeStreamState(firstOrgTenantId, false);
})();
// Tests that the 'setChangeStreamState' command tolerates the primary step-down and can
@@ -302,4 +312,5 @@ const secondOrgTenantId = ObjectId();
})();
replSetTest.stopSet();
+TestData.disableImplicitSessions = false;
}());
diff --git a/jstests/serverless/change_streams/multitenant_read_from_change_collection.js b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js
index 47872e89d71..1f8eabbd640 100644
--- a/jstests/serverless/change_streams/multitenant_read_from_change_collection.js
+++ b/jstests/serverless/change_streams/multitenant_read_from_change_collection.js
@@ -67,6 +67,15 @@ assertDropAndRecreateCollection(
assert(secondTenantTestDb.getCollectionInfos({name: "stockPrice"})[0]
.options.changeStreamPreAndPostImages.enabled);
+// Verify that while the change streams are disabled for the tenant, performing update and delete
+// operations on a collection with change stream pre- and post-images enabled succeeds. The
+// pre-images collection shouldn't be affected either.
+replSetTest.setChangeStreamState(firstTenantConn, false);
+assert.commandWorked(firstTenantTestDb.stockPrice.insert({_id: "mdb", price: 350}));
+assert.commandWorked(firstTenantTestDb.stockPrice.updateOne({_id: "mdb"}, {$set: {price: 450}}));
+assert.commandWorked(firstTenantTestDb.stockPrice.deleteOne({_id: "mdb"}));
+assert(!firstTenantConn.getDB("config").getCollectionNames().includes("system.preimages"));
+
// Create a new incarnation of the change collection for the first tenant.
replSetTest.setChangeStreamState(firstTenantConn, false);
replSetTest.setChangeStreamState(firstTenantConn, true);
diff --git a/jstests/serverless/change_streams_cluster_parameter.js b/jstests/serverless/change_streams_cluster_parameter.js
index 05e071d7da5..474f481d172 100644
--- a/jstests/serverless/change_streams_cluster_parameter.js
+++ b/jstests/serverless/change_streams_cluster_parameter.js
@@ -5,6 +5,8 @@
// requires_sharding,
// featureFlagServerlessChangeStreams,
// requires_fcv_63,
+// # TODO SERVER-74811: Re-enable this test.
+// __TEMPORARILY_DISABLED__,
// ]
(function() {
"use strict";
diff --git a/src/mongo/db/bulk_write_shard_test.cpp b/src/mongo/db/bulk_write_shard_test.cpp
index c53a3961d57..3e115b492e6 100644
--- a/src/mongo/db/bulk_write_shard_test.cpp
+++ b/src/mongo/db/bulk_write_shard_test.cpp
@@ -115,7 +115,7 @@ void createTestCollection(OperationContext* opCtx, const NamespaceString& nss) {
void installDatabaseMetadata(OperationContext* opCtx,
const DatabaseName& dbName,
const DatabaseVersion& dbVersion) {
- AutoGetDb autoDb(opCtx, dbName, MODE_X, {});
+ AutoGetDb autoDb(opCtx, dbName, MODE_X);
auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquireExclusive(opCtx, dbName);
scopedDss->setDbInfo(opCtx, {dbName.db(), ShardId("this"), dbVersion});
}
diff --git a/src/mongo/db/catalog/README.md b/src/mongo/db/catalog/README.md
index c16383f56c5..5928207b55e 100644
--- a/src/mongo/db/catalog/README.md
+++ b/src/mongo/db/catalog/README.md
@@ -925,6 +925,14 @@ synchronize shutdown, so that all operations are finished with the storage engin
Certain types of global storage engine operations, such as recoverToStableTimestamp(), also require
this lock to be held in exclusive mode.
+### Tenant Lock
+
+A resource of ResourceType Tenant is used when a database belongs to a tenant. It is used to synchronize
+change streams enablement and disablement for a tenant operation with other operations associated with the tenant.
+Enabling or disabling of change streams (by creating or dropping a change collection) for a tenant takes this lock
+in exclusive (X) mode. Acquiring this resource with an intent lock is an indication that the operation is doing reads (IS)
+or writes (IX) at the database or lower level.
+
### Database Lock
Any resource of ResourceType Database protects certain database-wide operations such as database
diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp
index 2707a8457b2..4ba2d7ad358 100644
--- a/src/mongo/db/catalog/create_collection.cpp
+++ b/src/mongo/db/catalog/create_collection.cpp
@@ -546,7 +546,12 @@ Status _createCollection(
const boost::optional<BSONObj>& idIndex,
const boost::optional<VirtualCollectionOptions>& virtualCollectionOptions = boost::none) {
return writeConflictRetry(opCtx, "create", nss.ns(), [&] {
- AutoGetDb autoDb(opCtx, nss.dbName(), MODE_IX);
+ // If a change collection is to be created, that is, the change streams are being enabled
+ // for a tenant, acquire exclusive tenant lock.
+ AutoGetDb autoDb(opCtx,
+ nss.dbName(),
+ MODE_IX /* database lock mode*/,
+ boost::make_optional(nss.tenantId() && nss.isChangeCollection(), MODE_X));
Lock::CollectionLock collLock(opCtx, nss, MODE_IX);
auto db = autoDb.ensureDbExists(opCtx);
diff --git a/src/mongo/db/catalog/drop_collection.cpp b/src/mongo/db/catalog/drop_collection.cpp
index d91043545a6..a877ea142e0 100644
--- a/src/mongo/db/catalog/drop_collection.cpp
+++ b/src/mongo/db/catalog/drop_collection.cpp
@@ -369,7 +369,14 @@ Status _dropCollection(OperationContext* opCtx,
try {
return writeConflictRetry(opCtx, "drop", collectionName.ns(), [&] {
- AutoGetDb autoDb(opCtx, collectionName.dbName(), MODE_IX);
+ // If a change collection is to be dropped, that is, the change streams are being
+ // disabled for a tenant, acquire exclusive tenant lock.
+ AutoGetDb autoDb(opCtx,
+ collectionName.dbName(),
+ MODE_IX /* database lock mode*/,
+ boost::make_optional(collectionName.tenantId() &&
+ collectionName.isChangeCollection(),
+ MODE_X));
auto db = autoDb.getDb();
if (!db) {
return expectedUUID
diff --git a/src/mongo/db/catalog/drop_database.cpp b/src/mongo/db/catalog/drop_database.cpp
index 3dcd730e9f7..ed431fb0370 100644
--- a/src/mongo/db/catalog/drop_database.cpp
+++ b/src/mongo/db/catalog/drop_database.cpp
@@ -161,9 +161,11 @@ Status _dropDatabase(OperationContext* opCtx, const DatabaseName& dbName, bool a
// collections to drop.
repl::OpTime latestDropPendingOpTime;
+ const auto tenantLockMode{boost::make_optional(
+ dbName.tenantId() && dbName.db() == DatabaseName::kConfig.db(), MODE_X)};
{
boost::optional<AutoGetDb> autoDB;
- autoDB.emplace(opCtx, dbName, MODE_X);
+ autoDB.emplace(opCtx, dbName, MODE_X /* database lock mode*/, tenantLockMode);
Database* db = autoDB->getDb();
Status status = _checkNssAndReplState(opCtx, db, dbName);
@@ -188,7 +190,7 @@ Status _dropDatabase(OperationContext* opCtx, const DatabaseName& dbName, bool a
20337, "dropDatabase {dbName} - starting", "dropDatabase - starting", logAttrs(dbName));
db->setDropPending(opCtx, true);
- // If Database::dropCollectionEventIfSystem() fails, we should reset the drop-pending state
+ // If Database::dropCollectionEvenIfSystem() fails, we should reset the drop-pending state
// on Database.
ScopeGuard dropPendingGuard([&db, opCtx] { db->setDropPending(opCtx, false); });
auto indexBuildsCoord = IndexBuildsCoordinator::get(opCtx);
@@ -200,15 +202,17 @@ Status _dropDatabase(OperationContext* opCtx, const DatabaseName& dbName, bool a
// Create a scope guard to reset the drop-pending state on the database to false if
// there is a replica state change that kills this operation while the locks were
// yielded.
- ScopeGuard dropPendingGuardWhileUnlocked([dbName, opCtx, &dropPendingGuard] {
- // TODO (SERVER-71610): Fix to be interruptible or document exception.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT.
- AutoGetDb autoDB(opCtx, dbName, MODE_IX);
- if (auto db = autoDB.getDb()) {
- db->setDropPending(opCtx, false);
- }
- dropPendingGuard.dismiss();
- });
+ ScopeGuard dropPendingGuardWhileUnlocked(
+ [dbName, opCtx, &dropPendingGuard, tenantLockMode] {
+ // TODO (SERVER-71610): Fix to be interruptible or document exception.
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT.
+ AutoGetDb autoDB(
+ opCtx, dbName, MODE_X /* database lock mode*/, tenantLockMode);
+ if (auto db = autoDB.getDb()) {
+ db->setDropPending(opCtx, false);
+ }
+ dropPendingGuard.dismiss();
+ });
// Drop locks. The drop helper will acquire locks on our behalf.
autoDB = boost::none;
@@ -224,7 +228,7 @@ Status _dropDatabase(OperationContext* opCtx, const DatabaseName& dbName, bool a
dropDatabaseHangAfterWaitingForIndexBuilds.pauseWhileSet();
}
- autoDB.emplace(opCtx, dbName, MODE_X);
+ autoDB.emplace(opCtx, dbName, MODE_X /* database lock mode*/, tenantLockMode);
db = autoDB->getDb();
dropPendingGuardWhileUnlocked.dismiss();
@@ -428,7 +432,7 @@ Status _dropDatabase(OperationContext* opCtx, const DatabaseName& dbName, bool a
dropDatabaseHangAfterAllCollectionsDrop.pauseWhileSet();
}
- AutoGetDb autoDB(opCtx, dbName, MODE_X);
+ AutoGetDb autoDB(opCtx, dbName, MODE_X /* database lock mode*/, tenantLockMode);
auto db = autoDB.getDb();
if (!db) {
return Status(ErrorCodes::NamespaceNotFound,
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp
index bf8cdc581b8..1773269fd77 100644
--- a/src/mongo/db/catalog_raii.cpp
+++ b/src/mongo/db/catalog_raii.cpp
@@ -132,8 +132,9 @@ void verifyDbAndCollection(OperationContext* opCtx,
AutoGetDb::AutoGetDb(OperationContext* opCtx,
const DatabaseName& dbName,
LockMode mode,
+ boost::optional<LockMode> tenantLockMode,
Date_t deadline)
- : AutoGetDb(opCtx, dbName, mode, deadline, [] {
+ : AutoGetDb(opCtx, dbName, mode, tenantLockMode, deadline, [] {
Lock::GlobalLockSkipOptions options;
return options;
}()) {}
@@ -141,9 +142,12 @@ AutoGetDb::AutoGetDb(OperationContext* opCtx,
AutoGetDb::AutoGetDb(OperationContext* opCtx,
const DatabaseName& dbName,
LockMode mode,
+ boost::optional<LockMode> tenantLockMode,
Date_t deadline,
Lock::DBLockSkipOptions options)
- : _dbName(dbName), _dbLock(opCtx, dbName, mode, deadline, std::move(options)), _db([&] {
+ : _dbName(dbName),
+ _dbLock(opCtx, dbName, mode, deadline, std::move(options), tenantLockMode),
+ _db([&] {
auto databaseHolder = DatabaseHolder::get(opCtx);
return databaseHolder->getDb(opCtx, dbName);
}()) {
@@ -198,10 +202,17 @@ AutoGetDb AutoGetDb::createForAutoGetCollection(
return AutoGetDb(opCtx,
nsOrUUID.nss() ? nsOrUUID.nss()->dbName() : *nsOrUUID.dbName(),
isSharedLockMode(modeColl) ? MODE_IS : MODE_IX,
+ boost::none /* tenantLockMode */,
deadline,
std::move(dbLockOptions));
}
+AutoGetDb::AutoGetDb(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ LockMode mode,
+ Date_t deadline)
+ : AutoGetDb(opCtx, dbName, mode, boost::none, deadline) {}
+
Database* AutoGetDb::ensureDbExists(OperationContext* opCtx) {
if (_db) {
return _db;
@@ -714,39 +725,45 @@ AutoGetOplog::AutoGetOplog(OperationContext* opCtx, OplogAccessMode mode, Date_t
_oplog.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, _oplog));
}
-
AutoGetChangeCollection::AutoGetChangeCollection(OperationContext* opCtx,
AutoGetChangeCollection::AccessMode mode,
- boost::optional<TenantId> tenantId,
+ const TenantId& tenantId,
Date_t deadline) {
- if (mode == AccessMode::kWriteInOplogContext) {
- // The global lock must already be held.
- invariant(opCtx->lockState()->isWriteLocked());
- }
-
- if (mode != AccessMode::kRead) {
- // TODO SERVER-66715 avoid taking 'AutoGetCollection' and remove
- // 'AllowLockAcquisitionOnTimestampedUnitOfWork'.
- _allowLockAcquisitionTsWuow.emplace(opCtx->lockState());
+ const auto changeCollectionNamespaceString = NamespaceString::makeChangeCollectionNSS(tenantId);
+ if (AccessMode::kRead == mode || AccessMode::kWrite == mode) {
+ // Treat this as a regular AutoGetCollection.
+ _coll.emplace(opCtx,
+ changeCollectionNamespaceString,
+ mode == AccessMode::kRead ? MODE_IS : MODE_IX,
+ AutoGetCollection::Options{}.deadline(deadline));
+ return;
}
-
- _coll.emplace(opCtx,
- NamespaceString::makeChangeCollectionNSS(tenantId),
- mode == AccessMode::kRead ? MODE_IS : MODE_IX,
- AutoGetCollection::Options{}.deadline(deadline));
+ tassert(6671506, "Invalid lock mode", AccessMode::kWriteInOplogContext == mode);
+
+ // When writing to the change collection as part of normal operation, we avoid taking any new
+ // locks. The caller must already have the tenant lock that protects the tenant specific change
+ // stream collection from being dropped. That's sufficient for acquiring a raw collection
+ // pointer.
+ tassert(6671500,
+ str::stream() << "Lock not held in IX mode for the tenant " << tenantId,
+ opCtx->lockState()->isLockHeldForMode(
+ ResourceId(ResourceType::RESOURCE_TENANT, tenantId), LockMode::MODE_IX));
+ auto changeCollectionPtr = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(
+ opCtx, changeCollectionNamespaceString);
+ _changeCollection = CollectionPtr(changeCollectionPtr);
+ _changeCollection.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, _changeCollection));
}
const Collection* AutoGetChangeCollection::operator->() const {
- return _coll ? _coll->getCollection().get() : nullptr;
+ return (**this).get();
}
const CollectionPtr& AutoGetChangeCollection::operator*() const {
- return _coll->getCollection();
+ return (_coll) ? *(*_coll) : _changeCollection;
}
AutoGetChangeCollection::operator bool() const {
- return _coll && _coll->getCollection().get();
+ return static_cast<bool>(**this);
}
-
} // namespace mongo
diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h
index b03ca225f54..00bbf29e2e8 100644
--- a/src/mongo/db/catalog_raii.h
+++ b/src/mongo/db/catalog_raii.h
@@ -94,13 +94,37 @@ class AutoGetDb {
AutoGetDb(OperationContext* opCtx,
const DatabaseName& dbName,
LockMode mode,
+ boost::optional<LockMode> tenantLockMode,
Date_t deadline,
Lock::DBLockSkipOptions options);
public:
+ /**
+ * Acquires a lock on the specified database 'dbName' in the requested 'mode'.
+ *
+ * If the database belongs to a tenant, then acquires a tenant lock before the database lock.
+ * For 'mode' MODE_IS or MODE_S acquires tenant lock in intent-shared (IS) mode, otherwise,
+ * acquires a tenant lock in intent-exclusive (IX) mode.
+ */
+ AutoGetDb(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ LockMode mode,
+ Date_t deadline = Date_t::max());
+
+ /**
+ * Acquires a lock on the specified database 'dbName' in the requested 'mode'.
+ *
+ * If the database belongs to a tenant, then acquires a tenant lock before the database lock.
+ * For 'mode' MODE_IS or MODE_S acquires tenant lock in intent-shared (IS) mode, otherwise,
+ * acquires a tenant lock in intent-exclusive (IX) mode. A different, stronger tenant lock mode
+ * to acquire can be specified with 'tenantLockMode' parameter. Passing boost::none for the
+ * tenant lock mode does not skip the tenant lock, but indicates that the tenant lock in default
+ * mode should be acquired.
+ */
AutoGetDb(OperationContext* opCtx,
const DatabaseName& dbName,
LockMode mode,
+ boost::optional<LockMode> tenantLockMode,
Date_t deadline = Date_t::max());
AutoGetDb(AutoGetDb&&) = default;
@@ -585,12 +609,10 @@ private:
* A RAII-style class to acquire lock to a particular tenant's change collection.
*
* A change collection can be accessed in the following modes:
- * kWriteInOplogContext - perform writes to the change collection by taking the IX lock on a
- * tenant's change collection. The change collection is written along with
- * the oplog in the same 'WriteUnitOfWork' and assumes that the global IX
- * lock is already held.
- * kWrite - takes the IX lock on a tenant's change collection to perform any writes.
- * kRead - takes the IS lock on a tenant's change collection to perform any reads.
+ * kWriteInOplogContext - assumes that the tenant IX lock has been pre-acquired. The user can
+ * perform reads and writes to the change collection.
+ * kWrite - behaves the same as 'AutoGetCollection::AutoGetCollection()' with lock mode MODE_IX.
+ * kRead - behaves the same as 'AutoGetCollection::AutoGetCollection()' with lock mode MODE_IS.
*/
class AutoGetChangeCollection {
public:
@@ -598,7 +620,7 @@ public:
AutoGetChangeCollection(OperationContext* opCtx,
AccessMode mode,
- boost::optional<TenantId> tenantId,
+ const TenantId& tenantId,
Date_t deadline = Date_t::max());
AutoGetChangeCollection(const AutoGetChangeCollection&) = delete;
@@ -609,9 +631,10 @@ public:
explicit operator bool() const;
private:
+ // Used when the 'kWrite' or 'kRead' access mode is used.
boost::optional<AutoGetCollection> _coll;
-
- boost::optional<AllowLockAcquisitionOnTimestampedUnitOfWork> _allowLockAcquisitionTsWuow;
+ // Used when the 'kWriteInOplogContext' access mode is used.
+ CollectionPtr _changeCollection;
};
} // namespace mongo
diff --git a/src/mongo/db/change_collection_expired_change_remover_test.cpp b/src/mongo/db/change_collection_expired_change_remover_test.cpp
index 28779c57af7..88a3eb36f23 100644
--- a/src/mongo/db/change_collection_expired_change_remover_test.cpp
+++ b/src/mongo/db/change_collection_expired_change_remover_test.cpp
@@ -101,7 +101,7 @@ protected:
}
std::vector<repl::OplogEntry> readChangeCollection(OperationContext* opCtx,
- boost::optional<TenantId> tenantId) {
+ const TenantId& tenantId) {
auto changeCollection =
AutoGetChangeCollection{opCtx, AutoGetChangeCollection::AccessMode::kRead, tenantId};
@@ -127,7 +127,7 @@ protected:
}
size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx,
- boost::optional<TenantId> tenantId,
+ const TenantId& tenantId,
Date_t expirationTime) {
// Acquire intent-exclusive lock on the change collection. Early exit if the collection
// doesn't exist.
@@ -196,7 +196,7 @@ protected:
}
size_t removeExpiredChangeCollectionsDocuments(OperationContext* opCtx,
- boost::optional<TenantId> tenantId,
+ const TenantId& tenantId,
Date_t expirationTime) {
// Acquire intent-exclusive lock on the change collection. Early exit if the collection
// doesn't exist.
diff --git a/src/mongo/db/change_stream_change_collection_manager.cpp b/src/mongo/db/change_stream_change_collection_manager.cpp
index 644c9da46b3..706a596fcdc 100644
--- a/src/mongo/db/change_stream_change_collection_manager.cpp
+++ b/src/mongo/db/change_stream_change_collection_manager.cpp
@@ -164,14 +164,18 @@ boost::optional<BSONObj> createChangeCollectionEntryFromOplog(const BSONObj& opl
auto readyChangeCollDoc = changeCollDoc.freeze();
return readyChangeCollDoc.toBson();
}
+} // namespace
/**
- * Helper to write insert statements to respective change collections based on tenant ids.
+ * Locks respective change collections, writes insert statements to respective change collections
+ * based on tenant ids.
*/
-class ChangeCollectionsWriter {
+class ChangeStreamChangeCollectionManager::ChangeCollectionsWriterInternal {
public:
- explicit ChangeCollectionsWriter(const AutoGetChangeCollection::AccessMode& accessMode)
- : _accessMode{accessMode} {}
+ explicit ChangeCollectionsWriterInternal(OperationContext* opCtx,
+ OpDebug* opDebug,
+ const AutoGetChangeCollection::AccessMode& accessMode)
+ : _accessMode{accessMode}, _opCtx{opCtx}, _opDebug{opDebug} {}
/**
* Adds the insert statement for the provided tenant that will be written to the change
@@ -179,8 +183,22 @@ public:
*/
void add(InsertStatement insertStatement) {
if (auto tenantId = _extractTenantId(insertStatement); tenantId) {
- _tenantStatementsMap[*tenantId].push_back(std::move(insertStatement));
+ _tenantToStatementsAndChangeCollectionMap[*tenantId].insertStatements.push_back(
+ std::move(insertStatement));
+ }
+ }
+
+ /**
+ * Acquires locks to change collections of all tenants referred to by added insert statements.
+ */
+ void acquireLocks() {
+ tassert(6671503, "Locks cannot be acquired twice", !_locksAcquired);
+ for (auto&& [tenantId, insertStatementsAndChangeCollection] :
+ _tenantToStatementsAndChangeCollectionMap) {
+ insertStatementsAndChangeCollection.tenantChangeCollection.emplace(
+ _opCtx, _accessMode, tenantId);
}
+ _locksAcquired = true;
}
/**
@@ -188,10 +206,15 @@ public:
* encountered, the write is skipped and the remaining inserts are attempted individually. Bails
* out further writes if any other type of failure is encountered in writing to any change
* collection.
+ *
+ * Locks should be acquired before calling this method by calling 'acquireLocks()'.
*/
- Status write(OperationContext* opCtx, OpDebug* opDebug) {
- for (auto&& [tenantId, insertStatements] : _tenantStatementsMap) {
- AutoGetChangeCollection tenantChangeCollection(opCtx, _accessMode, tenantId);
+ Status write() {
+ tassert(6671504, "Locks should be acquired first", _locksAcquired);
+ for (auto&& [tenantId, insertStatementsAndChangeCollection] :
+ _tenantToStatementsAndChangeCollectionMap) {
+ AutoGetChangeCollection& tenantChangeCollection =
+ *insertStatementsAndChangeCollection.tenantChangeCollection;
// The change collection does not exist for a particular tenant because either the
// change collection is not enabled or is in the process of enablement. Ignore this
@@ -201,7 +224,7 @@ public:
}
// Writes to the change collection should not be replicated.
- repl::UnreplicatedWritesBlock unReplBlock(opCtx);
+ repl::UnreplicatedWritesBlock unReplBlock(_opCtx);
/**
* For a serverless shard merge, we clone all change collection entries from the donor
@@ -210,9 +233,9 @@ public:
* If we encounter a DuplicateKey error and the entry is identical to the existing one,
* we can safely skip and continue.
*/
- for (auto&& insertStatement : insertStatements) {
+ for (auto&& insertStatement : insertStatementsAndChangeCollection.insertStatements) {
Status status = collection_internal::insertDocument(
- opCtx, *tenantChangeCollection, insertStatement, opDebug, false);
+ _opCtx, *tenantChangeCollection, insertStatement, _opDebug, false);
if (status.code() == ErrorCodes::DuplicateKey) {
const auto dupKeyInfo = status.extraInfo<DuplicateKeyErrorInfo>();
@@ -232,11 +255,21 @@ public:
}
}
}
-
return Status::OK();
}
private:
+ /**
+ * Field 'insertStatements' contains insert statements to be written to the tenant's change
+ * collection associated with 'tenantChangeCollection' field.
+ */
+ struct TenantStatementsAndChangeCollection {
+
+ std::vector<InsertStatement> insertStatements;
+
+ boost::optional<AutoGetChangeCollection> tenantChangeCollection;
+ };
+
boost::optional<TenantId> _extractTenantId(const InsertStatement& insertStatement) {
// Parse the oplog entry to fetch the tenant id from 'tid' field. The oplog entry will not
// written to the change collection if 'tid' field is missing.
@@ -255,12 +288,75 @@ private:
// Mode required to access change collections.
const AutoGetChangeCollection::AccessMode _accessMode;
- // Maps inserts statements for each tenant.
- stdx::unordered_map<TenantId, std::vector<InsertStatement>, TenantId::Hasher>
- _tenantStatementsMap;
+ // A mapping from a tenant id to insert statements and the change collection of the tenant.
+ stdx::unordered_map<TenantId, TenantStatementsAndChangeCollection, TenantId::Hasher>
+ _tenantToStatementsAndChangeCollectionMap;
+
+ // An operation context to use while performing all operations in this class.
+ OperationContext* const _opCtx;
+
+ // An OpDebug to use while performing all operations in this class.
+ OpDebug* const _opDebug;
+
+ // Indicates if locks have been acquired.
+ bool _locksAcquired{false};
};
-} // namespace
+ChangeStreamChangeCollectionManager::ChangeCollectionsWriter::ChangeCollectionsWriter(
+ OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator beginOplogEntries,
+ std::vector<InsertStatement>::const_iterator endOplogEntries,
+ OpDebug* opDebug) {
+ // This method must be called within a 'WriteUnitOfWork'. The caller must be responsible for
+ // commiting the unit of work.
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ _writer = std::make_unique<ChangeCollectionsWriterInternal>(
+ opCtx, opDebug, AutoGetChangeCollection::AccessMode::kWrite);
+
+ // Transform oplog entries to change collections entries and group them by tenant id.
+ for (auto oplogEntryIter = beginOplogEntries; oplogEntryIter != endOplogEntries;
+ oplogEntryIter++) {
+ auto& oplogDoc = oplogEntryIter->doc;
+
+ // The initial seed oplog insertion is not timestamped as such the 'oplogSlot' is not
+ // initialized. The corresponding change collection insertion will not be timestamped.
+ auto oplogSlot = oplogEntryIter->oplogSlot;
+
+ auto changeCollDoc = createChangeCollectionEntryFromOplog(oplogDoc);
+
+ if (changeCollDoc) {
+ _writer->add(InsertStatement{
+ std::move(*changeCollDoc), oplogSlot.getTimestamp(), oplogSlot.getTerm()});
+ }
+ }
+}
+
+ChangeStreamChangeCollectionManager::ChangeCollectionsWriter::ChangeCollectionsWriter(
+ ChangeStreamChangeCollectionManager::ChangeCollectionsWriter&& other) = default;
+
+ChangeStreamChangeCollectionManager::ChangeCollectionsWriter&
+ChangeStreamChangeCollectionManager::ChangeCollectionsWriter::operator=(
+ ChangeStreamChangeCollectionManager::ChangeCollectionsWriter&& other) = default;
+
+ChangeStreamChangeCollectionManager::ChangeCollectionsWriter::~ChangeCollectionsWriter() = default;
+
+void ChangeStreamChangeCollectionManager::ChangeCollectionsWriter::acquireLocks() {
+ _writer->acquireLocks();
+}
+
+Status ChangeStreamChangeCollectionManager::ChangeCollectionsWriter::write() {
+ return _writer->write();
+}
+
+ChangeStreamChangeCollectionManager::ChangeCollectionsWriter
+ChangeStreamChangeCollectionManager::createChangeCollectionsWriter(
+ OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator beginOplogEntries,
+ std::vector<InsertStatement>::const_iterator endOplogEntries,
+ OpDebug* opDebug) {
+ return ChangeCollectionsWriter{opCtx, beginOplogEntries, endOplogEntries, opDebug};
+}
BSONObj ChangeStreamChangeCollectionManager::PurgingJobStats::toBSON() const {
return BSON("totalPass" << totalPass.load() << "docsDeleted" << docsDeleted.load()
@@ -326,8 +422,8 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
// commiting the unit of work.
invariant(opCtx->lockState()->inAWriteUnitOfWork());
- ChangeCollectionsWriter changeCollectionsWriter{
- AutoGetChangeCollection::AccessMode::kWriteInOplogContext};
+ ChangeCollectionsWriterInternal changeCollectionsWriter{
+ opCtx, nullptr /*opDebug*/, AutoGetChangeCollection::AccessMode::kWriteInOplogContext};
for (size_t idx = 0; idx < oplogRecords.size(); idx++) {
auto& record = oplogRecords[idx];
@@ -341,50 +437,16 @@ void ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
}
}
+ changeCollectionsWriter.acquireLocks();
+
// Write documents to change collections and throw exception in case of any failure.
- Status status = changeCollectionsWriter.write(opCtx, nullptr /* opDebug */);
+ Status status = changeCollectionsWriter.write();
if (!status.isOK()) {
LOGV2_FATAL(
6612300, "Failed to write to change collection", "reason"_attr = status.reason());
}
}
-Status ChangeStreamChangeCollectionManager::insertDocumentsToChangeCollection(
- OperationContext* opCtx,
- std::vector<InsertStatement>::const_iterator beginOplogEntries,
- std::vector<InsertStatement>::const_iterator endOplogEntries,
- bool isGlobalIXLockAcquired,
- OpDebug* opDebug) {
- // This method must be called within a 'WriteUnitOfWork'. The caller must be responsible for
- // commiting the unit of work.
- invariant(opCtx->lockState()->inAWriteUnitOfWork());
-
- // If the global IX lock is already acquired, then change collections entries will be written
- // within the oplog context as such acquire the correct access mode for change collections.
- const auto changeCollAccessMode = isGlobalIXLockAcquired
- ? AutoGetChangeCollection::AccessMode::kWriteInOplogContext
- : AutoGetChangeCollection::AccessMode::kWrite;
- ChangeCollectionsWriter changeCollectionsWriter{changeCollAccessMode};
-
- // Transform oplog entries to change collections entries and group them by tenant id.
- for (auto oplogEntryIter = beginOplogEntries; oplogEntryIter != endOplogEntries;
- oplogEntryIter++) {
- auto& oplogDoc = oplogEntryIter->doc;
-
- // The initial seed oplog insertion is not timestamped as such the 'oplogSlot' is not
- // initialized. The corresponding change collection insertion will not be timestamped.
- auto oplogSlot = oplogEntryIter->oplogSlot;
-
- if (auto changeCollDoc = createChangeCollectionEntryFromOplog(oplogDoc)) {
- changeCollectionsWriter.add(InsertStatement{
- std::move(changeCollDoc.get()), oplogSlot.getTimestamp(), oplogSlot.getTerm()});
- }
- }
-
- // Write documents to change collections.
- return changeCollectionsWriter.write(opCtx, opDebug);
-}
-
boost::optional<ChangeCollectionPurgingJobMetadata>
ChangeStreamChangeCollectionManager::getChangeCollectionPurgingJobMetadata(
OperationContext* opCtx, const CollectionPtr* changeCollection) {
diff --git a/src/mongo/db/change_stream_change_collection_manager.h b/src/mongo/db/change_stream_change_collection_manager.h
index 4ba82355bcf..f398824069a 100644
--- a/src/mongo/db/change_stream_change_collection_manager.h
+++ b/src/mongo/db/change_stream_change_collection_manager.h
@@ -137,19 +137,55 @@ public:
const std::vector<Record>& oplogRecords,
const std::vector<Timestamp>& oplogTimestamps);
+ class ChangeCollectionsWriterInternal;
/**
- * Performs a range inserts on respective change collections using the oplog entries as
- * specified by 'beginOplogEntries' and 'endOplogEntries'.
- *
- * Bails out if a failure is encountered in inserting documents to a particular change
- * collection.
+ * Change Collection Writer. After acquiring ChangeCollectionsWriter the user should trigger
+ * acquisition of the locks by calling 'acquireLocks()' before the first write in the Write Unit
+ * of Work. Then the write of documents to change collections can be triggered by calling
+ * 'write()'.
+ */
+ class ChangeCollectionsWriter {
+ friend class ChangeStreamChangeCollectionManager;
+
+ /**
+ * Constructs a writer from a range ['beginOplogEntries', 'endOplogEntries') of oplog
+ * entries.
+ */
+ ChangeCollectionsWriter(OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator beginOplogEntries,
+ std::vector<InsertStatement>::const_iterator endOplogEntries,
+ OpDebug* opDebug);
+
+ public:
+ ChangeCollectionsWriter(ChangeCollectionsWriter&&);
+ ChangeCollectionsWriter& operator=(ChangeCollectionsWriter&&);
+
+ /**
+ * Acquires locks needed to write documents to change collections.
+ */
+ void acquireLocks();
+
+ /**
+ * Writes documents to change collections.
+ */
+ Status write();
+
+ ~ChangeCollectionsWriter();
+
+ private:
+ std::unique_ptr<ChangeCollectionsWriterInternal> _writer;
+ };
+
+ /**
+ * Returns a change collection writer that can insert change collection entries into respective
+ * change collections. The entries are constructed from a range ['beginOplogEntries',
+ * 'endOplogEntries') of oplog entries.
*/
- Status insertDocumentsToChangeCollection(
+ ChangeCollectionsWriter createChangeCollectionsWriter(
OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator beginOplogEntries,
std::vector<InsertStatement>::const_iterator endOplogEntries,
- bool isGlobalIXLockAcquired,
OpDebug* opDebug);
PurgingJobStats& getPurgingJobStats() {
diff --git a/src/mongo/db/change_stream_pre_images_collection_manager.cpp b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
index 0bb0ed06c96..deb841bb3f7 100644
--- a/src/mongo/db/change_stream_pre_images_collection_manager.cpp
+++ b/src/mongo/db/change_stream_pre_images_collection_manager.cpp
@@ -170,6 +170,11 @@ void ChangeStreamPreImagesCollectionManager::insertPreImage(OperationContext* op
AutoGetCollection preImagesCollectionRaii(
opCtx, preImagesCollectionNamespace, LockMode::MODE_IX);
auto& changeStreamPreImagesCollection = preImagesCollectionRaii.getCollection();
+ if (preImagesCollectionNamespace.tenantId() &&
+ !change_stream_serverless_helpers::isChangeStreamEnabled(
+ opCtx, *preImagesCollectionNamespace.tenantId())) {
+ return;
+ }
tassert(6646201,
"The change stream pre-images collection is not present",
changeStreamPreImagesCollection);
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index c2f2701cd42..e08e6b09e30 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -795,19 +795,15 @@ Status runAggregate(OperationContext* opCtx,
nss = NamespaceString::kRsOplogNamespace;
// In case of serverless the change stream will be opened on the change collection.
- if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
+ const bool changeCollectionsMode =
+ change_stream_serverless_helpers::isChangeCollectionsModeActive();
+ if (changeCollectionsMode) {
const auto tenantId =
change_stream_serverless_helpers::resolveTenantId(origNss.tenantId());
uassert(ErrorCodes::BadValue,
"Change streams cannot be used without tenant id",
tenantId);
-
- uassert(ErrorCodes::ChangeStreamNotEnabled,
- "Change streams must be enabled before being used.",
- change_stream_serverless_helpers::isChangeStreamEnabled(opCtx, *tenantId));
-
-
nss = NamespaceString::makeChangeCollectionNSS(tenantId);
}
@@ -846,6 +842,11 @@ Status runAggregate(OperationContext* opCtx,
// Obtain collection locks on the execution namespace; that is, the oplog.
initContext(auto_get_collection::ViewMode::kViewsForbidden);
registerTelemetry();
+ uassert(ErrorCodes::ChangeStreamNotEnabled,
+ "Change streams must be enabled before being used",
+ !changeCollectionsMode ||
+ change_stream_serverless_helpers::isChangeStreamEnabled(opCtx,
+ *nss.tenantId()));
} else if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) {
uassert(4928901,
str::stream() << AggregateCommandRequest::kCollectionUUIDFieldName
diff --git a/src/mongo/db/concurrency/d_concurrency.cpp b/src/mongo/db/concurrency/d_concurrency.cpp
index dbc851ff189..86362019519 100644
--- a/src/mongo/db/concurrency/d_concurrency.cpp
+++ b/src/mongo/db/concurrency/d_concurrency.cpp
@@ -183,17 +183,40 @@ void Lock::GlobalLock::_unlock() {
_result = LOCK_INVALID;
}
+Lock::TenantLock::TenantLock(OperationContext* opCtx,
+ const TenantId& tenantId,
+ LockMode mode,
+ Date_t deadline)
+ : _id{RESOURCE_TENANT, tenantId}, _opCtx{opCtx} {
+ dassert(_opCtx->lockState()->isLockHeldForMode(resourceIdGlobal,
+ isSharedLockMode(mode) ? MODE_IS : MODE_IX));
+ _opCtx->lockState()->lock(_opCtx, _id, mode, deadline);
+}
+
+Lock::TenantLock::TenantLock(TenantLock&& otherLock)
+ : _id(otherLock._id), _opCtx(otherLock._opCtx) {
+ otherLock._opCtx = nullptr;
+}
+
+Lock::TenantLock::~TenantLock() {
+ if (_opCtx) {
+ _opCtx->lockState()->unlock(_id);
+ }
+}
+
Lock::DBLock::DBLock(OperationContext* opCtx,
const DatabaseName& dbName,
LockMode mode,
- Date_t deadline)
- : DBLock(opCtx, dbName, mode, deadline, DBLockSkipOptions{}) {}
+ Date_t deadline,
+ boost::optional<LockMode> tenantLockMode)
+ : DBLock(opCtx, dbName, mode, deadline, DBLockSkipOptions{}, tenantLockMode) {}
Lock::DBLock::DBLock(OperationContext* opCtx,
const DatabaseName& dbName,
LockMode mode,
Date_t deadline,
- DBLockSkipOptions options)
+ DBLockSkipOptions options,
+ boost::optional<LockMode> tenantLockMode)
: _id(RESOURCE_DATABASE, dbName), _opCtx(opCtx), _result(LOCK_INVALID), _mode(mode) {
_globalLock.emplace(opCtx,
@@ -204,6 +227,32 @@ Lock::DBLock::DBLock(OperationContext* opCtx,
massert(28539, "need a valid database name", !dbName.db().empty());
+ tassert(6671501,
+ str::stream() << "Tenant lock mode " << modeName(*tenantLockMode)
+ << " specified for database " << dbName.db()
+ << " that does not belong to a tenant",
+ !tenantLockMode || dbName.tenantId());
+
+ // Acquire the tenant lock.
+ if (dbName.tenantId()) {
+ const auto effectiveTenantLockMode = [&]() {
+ const auto defaultTenantLockMode = isSharedLockMode(_mode) ? MODE_IS : MODE_IX;
+ if (tenantLockMode) {
+ tassert(6671505,
+ str::stream()
+ << "Requested tenant lock mode " << modeName(*tenantLockMode)
+ << " that is weaker than the default one "
+ << modeName(defaultTenantLockMode) << " for database " << dbName.db()
+ << " of tenant " << dbName.tenantId()->toString(),
+ isModeCovered(defaultTenantLockMode, *tenantLockMode));
+ return *tenantLockMode;
+ } else {
+ return defaultTenantLockMode;
+ }
+ }();
+ _tenantLock.emplace(opCtx, *dbName.tenantId(), effectiveTenantLockMode, deadline);
+ }
+
_opCtx->lockState()->lock(_opCtx, _id, _mode, deadline);
_result = LOCK_OK;
}
@@ -213,7 +262,8 @@ Lock::DBLock::DBLock(DBLock&& otherLock)
_opCtx(otherLock._opCtx),
_result(otherLock._result),
_mode(otherLock._mode),
- _globalLock(std::move(otherLock._globalLock)) {
+ _globalLock(std::move(otherLock._globalLock)),
+ _tenantLock(std::move(otherLock._tenantLock)) {
// Mark as moved so the destructor doesn't invalidate the newly-constructed lock.
otherLock._result = LOCK_INVALID;
}
diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h
index 4c0903c101c..aacd3b6d724 100644
--- a/src/mongo/db/concurrency/d_concurrency.h
+++ b/src/mongo/db/concurrency/d_concurrency.h
@@ -322,6 +322,36 @@ public:
using DBLockSkipOptions = GlobalLockSkipOptions;
/**
+ * Tenant lock.
+ *
+ * Controls access to resources belonging to a tenant.
+ *
+ * This lock supports four modes (see Lock_Mode):
+ * MODE_IS: concurrent access to tenant's resources, requiring further database read locks
+ * MODE_IX: concurrent access to tenant's resources, requiring further database read or write
+ * locks
+ * MODE_S: shared read access to tenant's resources, blocking any writers
+ * MODE_X: exclusive access to tenant's resources, blocking all other readers and writers.
+ */
+ class TenantLock {
+ TenantLock(const TenantLock&) = delete;
+ TenantLock& operator=(const TenantLock&) = delete;
+
+ public:
+ TenantLock(OperationContext* opCtx,
+ const TenantId& tenantId,
+ LockMode mode,
+ Date_t deadline = Date_t::max());
+
+ TenantLock(TenantLock&&);
+ ~TenantLock();
+
+ private:
+ ResourceId _id;
+ OperationContext* _opCtx;
+ };
+
+ /**
* Database lock.
*
* This lock supports four modes (see Lock_Mode):
@@ -334,19 +364,28 @@ public:
* for MODE_IX or MODE_X also acquires global lock in intent-exclusive (IX) mode.
* For storage engines that do not support collection-level locking, MODE_IS will be
* upgraded to MODE_S and MODE_IX will be upgraded to MODE_X.
+ *
+ * If the database belongs to a tenant, then acquires a tenant lock before the database lock.
+ * For 'mode' MODE_IS or MODE_S acquires tenant lock in intent-shared (IS) mode, otherwise,
+ * acquires a tenant lock in intent-exclusive (IX) mode. A different, stronger tenant lock mode
+ * to acquire can be specified with 'tenantLockMode' parameter. Passing boost::none for the
+ * tenant lock mode does not skip the tenant lock, but indicates that the tenant lock in default
+ * mode should be acquired.
*/
class DBLock {
public:
DBLock(OperationContext* opCtx,
const DatabaseName& dbName,
LockMode mode,
- Date_t deadline = Date_t::max());
+ Date_t deadline = Date_t::max(),
+ boost::optional<LockMode> tenantLockMode = boost::none);
DBLock(OperationContext* opCtx,
const DatabaseName& dbName,
LockMode mode,
Date_t deadline,
- DBLockSkipOptions skipOptions);
+ DBLockSkipOptions skipOptions,
+ boost::optional<LockMode> tenantLockMode = boost::none);
DBLock(DBLock&&);
~DBLock();
@@ -371,6 +410,9 @@ public:
// Acquires the global lock on our behalf.
boost::optional<GlobalLock> _globalLock;
+
+ // Acquires the tenant lock on behalf of this DB lock.
+ boost::optional<TenantLock> _tenantLock;
};
/**
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp
index 4320e51ecdf..d7680530e80 100644
--- a/src/mongo/db/concurrency/d_concurrency_test.cpp
+++ b/src/mongo/db/concurrency/d_concurrency_test.cpp
@@ -622,6 +622,87 @@ TEST_F(DConcurrencyTestFixture, DBLockSDoesNotSetGlobalWriteLockedOnOperationCon
ASSERT_TRUE(opCtx->lockState()->wasGlobalLockTaken());
}
+TEST_F(DConcurrencyTestFixture, TenantLock) {
+ auto opCtx = makeOperationContext();
+ getClient()->swapLockState(std::make_unique<LockerImpl>(opCtx->getServiceContext()));
+ TenantId tenantId{OID::gen()};
+ ResourceId tenantResourceId{ResourceType::RESOURCE_TENANT, tenantId};
+ struct TestCase {
+ LockMode globalLockMode;
+ LockMode tenantLockMode;
+ };
+ std::vector<TestCase> testCases{
+ {MODE_IX, MODE_IX}, {MODE_IX, MODE_X}, {MODE_IS, MODE_S}, {MODE_IS, MODE_IS}};
+ for (auto&& testCase : testCases) {
+ {
+ Lock::GlobalLock globalLock{opCtx.get(), testCase.globalLockMode};
+ Lock::TenantLock tenantLock{opCtx.get(), tenantId, testCase.tenantLockMode};
+ ASSERT_TRUE(
+ opCtx->lockState()->isLockHeldForMode(tenantResourceId, testCase.tenantLockMode));
+ }
+ ASSERT_FALSE(
+ opCtx->lockState()->isLockHeldForMode(tenantResourceId, testCase.tenantLockMode));
+ }
+}
+
+TEST_F(DConcurrencyTestFixture, DBLockTakesTenantLock) {
+ auto opCtx = makeOperationContext();
+ getClient()->swapLockState(std::make_unique<LockerImpl>(opCtx->getServiceContext()));
+ TenantId tenantId{OID::gen()};
+ ResourceId tenantResourceId{ResourceType::RESOURCE_TENANT, tenantId};
+ struct TestCase {
+ bool tenantOwned;
+ LockMode databaseLockMode;
+ boost::optional<LockMode> tenantLockMode;
+ LockMode expectedTenantLockMode;
+ };
+
+ StringData testDatabaseName{"test"};
+ const bool tenantOwned{true};
+ const bool tenantless{false};
+ const boost::optional<LockMode> none;
+ std::vector<TestCase> testCases{
+ {tenantless, MODE_S, none, MODE_NONE},
+ {tenantless, MODE_IS, none, MODE_NONE},
+ {tenantless, MODE_X, none, MODE_NONE},
+ {tenantless, MODE_IX, none, MODE_NONE},
+ {tenantOwned, MODE_S, none, MODE_IS},
+ {tenantOwned, MODE_IS, none, MODE_IS},
+ {tenantOwned, MODE_X, none, MODE_IX},
+ {tenantOwned, MODE_IX, none, MODE_IX},
+ {tenantOwned, MODE_X, MODE_X, MODE_X},
+ {tenantOwned, MODE_IX, MODE_X, MODE_X},
+ };
+ for (auto&& testCase : testCases) {
+ {
+ Lock::DBLock dbLock(
+ opCtx.get(),
+ DatabaseName(testCase.tenantOwned ? boost::make_optional(tenantId) : boost::none,
+ testDatabaseName),
+ testCase.databaseLockMode,
+ Date_t::max(),
+ testCase.tenantLockMode);
+ ASSERT(opCtx->lockState()->getLockMode(tenantResourceId) ==
+ testCase.expectedTenantLockMode)
+ << " db lock mode: " << modeName(testCase.databaseLockMode)
+ << ", tenant lock mode: "
+ << (testCase.tenantLockMode ? modeName(*testCase.tenantLockMode) : "-");
+ }
+ ASSERT(opCtx->lockState()->getLockMode(tenantResourceId) == MODE_NONE)
+ << " db lock mode: " << modeName(testCase.databaseLockMode) << ", tenant lock mode: "
+ << (testCase.tenantLockMode ? modeName(*testCase.tenantLockMode) : "-");
+ }
+
+ // Verify that tenant lock survives move.
+ {
+ auto lockBuilder = [&]() {
+ return Lock::DBLock{opCtx.get(), DatabaseName(tenantId, testDatabaseName), MODE_S};
+ };
+ Lock::DBLock dbLockCopy{lockBuilder()};
+ ASSERT(opCtx->lockState()->isLockHeldForMode(tenantResourceId, MODE_IS));
+ }
+}
+
TEST_F(DConcurrencyTestFixture, GlobalLockXDoesNotSetGlobalWriteLockedWhenLockAcquisitionTimesOut) {
auto clients = makeKClientsWithLockers(2);
@@ -1200,32 +1281,123 @@ TEST_F(DConcurrencyTestFixture, MultipleConflictingDBLocksOnSameThread) {
ASSERT(lockState->isDbLockedForMode(dbName, MODE_S));
}
-TEST_F(DConcurrencyTestFixture, IsDbLockedForSMode) {
- DatabaseName dbName(boost::none, "db");
-
+TEST_F(DConcurrencyTestFixture, IsDbLockedForMode_IsCollectionLockedForMode) {
auto opCtx = makeOperationContext();
getClient()->swapLockState(std::make_unique<LockerImpl>(opCtx->getServiceContext()));
auto lockState = opCtx->lockState();
- Lock::DBLock dbLock(opCtx.get(), dbName, MODE_S);
-
- ASSERT(lockState->isDbLockedForMode(dbName, MODE_IS));
- ASSERT(!lockState->isDbLockedForMode(dbName, MODE_IX));
- ASSERT(lockState->isDbLockedForMode(dbName, MODE_S));
- ASSERT(!lockState->isDbLockedForMode(dbName, MODE_X));
-}
-
-TEST_F(DConcurrencyTestFixture, IsDbLockedForXMode) {
- DatabaseName dbName(boost::none, "db");
- auto opCtx = makeOperationContext();
- getClient()->swapLockState(std::make_unique<LockerImpl>(opCtx->getServiceContext()));
- auto lockState = opCtx->lockState();
- Lock::DBLock dbLock(opCtx.get(), dbName, MODE_X);
+ // Database ownership options to test.
+ enum DatabaseOwnershipOptions {
+ // Owned by a tenant and not.
+ kAll,
+ // Owned by a tenant only.
+ kTenantOwned
+ };
+ struct TestCase {
+ LockMode globalLockMode;
+ LockMode tenantLockMode;
+ DatabaseOwnershipOptions databaseOwnership;
+ LockMode databaseLockMode;
+ LockMode checkedDatabaseLockMode;
+ bool expectedResult;
+ };
- ASSERT(lockState->isDbLockedForMode(dbName, MODE_IS));
- ASSERT(lockState->isDbLockedForMode(dbName, MODE_IX));
- ASSERT(lockState->isDbLockedForMode(dbName, MODE_S));
- ASSERT(lockState->isDbLockedForMode(dbName, MODE_X));
+ TenantId tenantId{OID::gen()};
+ StringData testDatabaseName{"test"};
+ std::vector<TestCase> testCases{
+ // Only global lock acquired.
+ {MODE_X, MODE_NONE, kAll, MODE_NONE, MODE_X, true},
+ {MODE_X, MODE_NONE, kAll, MODE_NONE, MODE_IX, true},
+ {MODE_X, MODE_NONE, kAll, MODE_NONE, MODE_S, true},
+ {MODE_X, MODE_NONE, kAll, MODE_NONE, MODE_IS, true},
+ {MODE_S, MODE_NONE, kAll, MODE_NONE, MODE_X, false},
+ {MODE_S, MODE_NONE, kAll, MODE_NONE, MODE_IX, false},
+ {MODE_S, MODE_NONE, kAll, MODE_NONE, MODE_S, true},
+ {MODE_S, MODE_NONE, kAll, MODE_NONE, MODE_IS, true},
+ // Global and tenant locks acquired.
+ {MODE_IX, MODE_NONE, kTenantOwned, MODE_NONE, MODE_X, false},
+ {MODE_IX, MODE_NONE, kTenantOwned, MODE_NONE, MODE_IX, false},
+ {MODE_IX, MODE_NONE, kTenantOwned, MODE_NONE, MODE_S, false},
+ {MODE_IX, MODE_NONE, kTenantOwned, MODE_NONE, MODE_IS, false},
+ {MODE_IX, MODE_X, kTenantOwned, MODE_NONE, MODE_X, true},
+ {MODE_IX, MODE_X, kTenantOwned, MODE_NONE, MODE_IX, true},
+ {MODE_IX, MODE_X, kTenantOwned, MODE_NONE, MODE_S, true},
+ {MODE_IX, MODE_X, kTenantOwned, MODE_NONE, MODE_IS, true},
+ {MODE_IS, MODE_NONE, kTenantOwned, MODE_NONE, MODE_X, false},
+ {MODE_IS, MODE_NONE, kTenantOwned, MODE_NONE, MODE_IX, false},
+ {MODE_IS, MODE_NONE, kTenantOwned, MODE_NONE, MODE_S, false},
+ {MODE_IS, MODE_NONE, kTenantOwned, MODE_NONE, MODE_IS, false},
+ {MODE_IS, MODE_S, kTenantOwned, MODE_NONE, MODE_X, false},
+ {MODE_IS, MODE_S, kTenantOwned, MODE_NONE, MODE_IX, false},
+ {MODE_IS, MODE_S, kTenantOwned, MODE_NONE, MODE_S, true},
+ {MODE_IS, MODE_S, kTenantOwned, MODE_NONE, MODE_IS, true},
+ // Global, tenant, db locks acquired.
+ {MODE_NONE, MODE_NONE, kAll, MODE_NONE, MODE_X, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_NONE, MODE_IX, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_NONE, MODE_S, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_NONE, MODE_IS, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_S, MODE_X, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_S, MODE_IX, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_S, MODE_S, true},
+ {MODE_NONE, MODE_NONE, kAll, MODE_S, MODE_IS, true},
+ {MODE_NONE, MODE_NONE, kAll, MODE_X, MODE_X, true},
+ {MODE_NONE, MODE_NONE, kAll, MODE_X, MODE_IX, true},
+ {MODE_NONE, MODE_NONE, kAll, MODE_X, MODE_S, true},
+ {MODE_NONE, MODE_NONE, kAll, MODE_X, MODE_IS, true},
+ {MODE_NONE, MODE_NONE, kAll, MODE_IX, MODE_X, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_IX, MODE_IX, true},
+ {MODE_NONE, MODE_NONE, kAll, MODE_IX, MODE_S, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_IX, MODE_IS, true},
+ {MODE_NONE, MODE_NONE, kAll, MODE_IS, MODE_X, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_IS, MODE_IX, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_IS, MODE_S, false},
+ {MODE_NONE, MODE_NONE, kAll, MODE_IS, MODE_IS, true},
+ };
+ for (auto&& testCase : testCases) {
+ {
+ for (auto&& tenantOwned : std::vector<bool>{false, true}) {
+ if (!tenantOwned && kTenantOwned == testCase.databaseOwnership) {
+ continue;
+ }
+ const DatabaseName databaseName(
+ tenantOwned ? boost::make_optional(tenantId) : boost::none, testDatabaseName);
+ boost::optional<Lock::GlobalLock> globalLock;
+ boost::optional<Lock::TenantLock> tenantLock;
+ boost::optional<Lock::DBLock> dbLock;
+
+ if (MODE_NONE != testCase.globalLockMode) {
+ globalLock.emplace(opCtx.get(), testCase.globalLockMode);
+ }
+ if (MODE_NONE != testCase.tenantLockMode) {
+ tenantLock.emplace(opCtx.get(), tenantId, testCase.tenantLockMode);
+ }
+ if (MODE_NONE != testCase.databaseLockMode) {
+ dbLock.emplace(opCtx.get(), databaseName, testCase.databaseLockMode);
+ }
+ ASSERT(
+ lockState->isDbLockedForMode(databaseName, testCase.checkedDatabaseLockMode) ==
+ testCase.expectedResult)
+ << " global lock mode: " << modeName(testCase.globalLockMode)
+ << " tenant lock mode: " << modeName(testCase.tenantLockMode)
+ << " db lock mode: " << modeName(testCase.databaseLockMode)
+ << " tenant owned: " << tenantOwned
+ << " checked lock mode: " << modeName(testCase.checkedDatabaseLockMode);
+
+ // If database is not locked with intent lock, a collection in the database is
+ // locked for the same lock mode.
+ ASSERT(testCase.databaseLockMode == MODE_IS ||
+ testCase.databaseLockMode == MODE_IX ||
+ lockState->isCollectionLockedForMode(
+ NamespaceString::createNamespaceString_forTest(databaseName, "coll"),
+ testCase.checkedDatabaseLockMode) == testCase.expectedResult)
+ << " global lock mode: " << modeName(testCase.globalLockMode)
+ << " tenant lock mode: " << modeName(testCase.tenantLockMode)
+ << " db lock mode: " << modeName(testCase.databaseLockMode)
+ << " tenant owned: " << tenantOwned
+ << " checked lock mode: " << modeName(testCase.checkedDatabaseLockMode);
+ }
+ }
+ }
}
TEST_F(DConcurrencyTestFixture, IsCollectionLocked_DB_Locked_IS) {
diff --git a/src/mongo/db/concurrency/lock_manager_defs.h b/src/mongo/db/concurrency/lock_manager_defs.h
index 026af0b28f4..6fbf8968552 100644
--- a/src/mongo/db/concurrency/lock_manager_defs.h
+++ b/src/mongo/db/concurrency/lock_manager_defs.h
@@ -159,6 +159,9 @@ enum ResourceType {
/** Used for global exclusive operations */
RESOURCE_GLOBAL,
+ /** Encompasses resources belonging to a tenant, if in multi-tenant mode.*/
+ RESOURCE_TENANT,
+
/** Generic resources, used for multi-granularity locking, together with the above locks */
RESOURCE_DATABASE,
RESOURCE_COLLECTION,
@@ -191,7 +194,7 @@ enum class ResourceGlobalId : uint8_t {
* Maps the resource id to a human-readable string.
*/
static const char* ResourceTypeNames[] = {
- "Invalid", "Global", "Database", "Collection", "Metadata", "Mutex"};
+ "Invalid", "Global", "Tenant", "Database", "Collection", "Metadata", "Mutex"};
/**
* Maps the global resource id to a human-readable string.
@@ -245,13 +248,18 @@ public:
}
ResourceId(ResourceType type, const std::string& str)
: _fullHash(fullHash(type, hashStringData(str))) {
- // Resources of type database or collection must never be passed as a raw string
- invariant(type != RESOURCE_DATABASE && type != RESOURCE_COLLECTION);
+ // Resources of type database, collection, or tenant must never be passed as a raw string.
+ invariant(type != RESOURCE_DATABASE && type != RESOURCE_COLLECTION &&
+ type != RESOURCE_TENANT);
verifyNoResourceMutex(type);
}
ResourceId(ResourceType type, uint64_t hashId) : _fullHash(fullHash(type, hashId)) {
verifyNoResourceMutex(type);
}
+ ResourceId(ResourceType type, const TenantId& tenantId)
+ : _fullHash{fullHash(type, hashStringData(tenantId.toString()))} {
+ verifyNoResourceMutex(type);
+ }
bool isValid() const {
return getType() != RESOURCE_INVALID;
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index b19fde7e391..5fbf38e6d3b 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -148,6 +148,7 @@ bool LockerImpl::_shouldDelayUnlock(ResourceId resId, LockMode mode) const {
return false;
case RESOURCE_GLOBAL:
+ case RESOURCE_TENANT:
case RESOURCE_DATABASE:
case RESOURCE_COLLECTION:
case RESOURCE_METADATA:
@@ -647,11 +648,39 @@ bool LockerImpl::isLockHeldForMode(ResourceId resId, LockMode mode) const {
return isModeCovered(mode, getLockMode(resId));
}
-bool LockerImpl::isDbLockedForMode(const DatabaseName& dbName, LockMode mode) const {
- if (isW())
+boost::optional<bool> LockerImpl::_globalAndTenantLocksImplyDBOrCollectionLockedForMode(
+ const boost::optional<TenantId>& tenantId, LockMode lockMode) const {
+ if (isW()) {
return true;
- if (isR() && isSharedLockMode(mode))
+ }
+ if (isR() && isSharedLockMode(lockMode)) {
return true;
+ }
+ if (tenantId) {
+ const ResourceId tenantResourceId{ResourceType::RESOURCE_TENANT, *tenantId};
+ switch (getLockMode(tenantResourceId)) {
+ case MODE_NONE:
+ return false;
+ case MODE_X:
+ return true;
+ case MODE_S:
+ return isSharedLockMode(lockMode);
+ case MODE_IX:
+ case MODE_IS:
+ break;
+ default:
+ MONGO_UNREACHABLE_TASSERT(6671502);
+ }
+ }
+ return boost::none;
+}
+
+bool LockerImpl::isDbLockedForMode(const DatabaseName& dbName, LockMode mode) const {
+ if (auto lockedForMode =
+ _globalAndTenantLocksImplyDBOrCollectionLockedForMode(dbName.tenantId(), mode);
+ lockedForMode) {
+ return *lockedForMode;
+ }
const ResourceId resIdDb(RESOURCE_DATABASE, dbName);
return isLockHeldForMode(resIdDb, mode);
@@ -660,16 +689,17 @@ bool LockerImpl::isDbLockedForMode(const DatabaseName& dbName, LockMode mode) co
bool LockerImpl::isCollectionLockedForMode(const NamespaceString& nss, LockMode mode) const {
invariant(nss.coll().size());
- if (isW())
- return true;
- if (isR() && isSharedLockMode(mode))
+ if (!shouldConflictWithSecondaryBatchApplication())
return true;
- const ResourceId resIdDb(RESOURCE_DATABASE, nss.dbName());
+ if (auto lockedForMode =
+ _globalAndTenantLocksImplyDBOrCollectionLockedForMode(nss.tenantId(), mode);
+ lockedForMode) {
+ return *lockedForMode;
+ }
+ const ResourceId resIdDb(RESOURCE_DATABASE, nss.dbName());
LockMode dbMode = getLockMode(resIdDb);
- if (!shouldConflictWithSecondaryBatchApplication())
- return true;
switch (dbMode) {
case MODE_NONE:
@@ -806,6 +836,7 @@ bool LockerImpl::saveLockStateAndUnlock(Locker::LockSnapshot* stateOut) {
// We should never have to save and restore metadata locks.
invariant(RESOURCE_DATABASE == resType || RESOURCE_COLLECTION == resType ||
+ RESOURCE_TENANT == resType ||
(resId == resourceIdParallelBatchWriterMode && isSharedLockMode(it->mode)) ||
resId == resourceIdFeatureCompatibilityVersion ||
(resId == resourceIdReplicationStateTransitionLock && it->mode == MODE_IX));
diff --git a/src/mongo/db/concurrency/lock_state.h b/src/mongo/db/concurrency/lock_state.h
index 7ae54b1fd74..8788d74c058 100644
--- a/src/mongo/db/concurrency/lock_state.h
+++ b/src/mongo/db/concurrency/lock_state.h
@@ -331,6 +331,20 @@ private:
*/
void _dumpLockerAndLockManagerRequests();
+ /**
+ * Determines whether global and tenant lock state implies that some database or lower level
+ * resource, such as a collection, belonging to a tenant identified by 'tenantId' is locked in
+ * 'lockMode'.
+ *
+ * Returns:
+ * true, if the global and tenant locks imply that the resource is locked for 'mode';
+ * false, if the global and tenant locks imply that the resource is not locked for 'mode';
+ * boost::none, if the global and tenant lock state does not imply either outcome and lower
+ * level locks should be consulted.
+ */
+ boost::optional<bool> _globalAndTenantLocksImplyDBOrCollectionLockedForMode(
+ const boost::optional<TenantId>& tenantId, LockMode lockMode) const;
+
// Used to disambiguate different lockers
const LockerId _id;
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 86e7b36872c..baff24f4ba7 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -140,14 +140,31 @@ Status _insertDocumentsToOplogAndChangeCollections(
std::vector<InsertStatement>::const_iterator end,
bool skipWritesToOplog) {
WriteUnitOfWork wunit(opCtx);
+ boost::optional<AutoGetOplog> autoOplog;
+ boost::optional<ChangeStreamChangeCollectionManager::ChangeCollectionsWriter>
+ changeCollectionWriter;
+ // Acquire locks. We must acquire the locks for all collections we intend to write to before
+ // performing any writes. This avoids potential deadlocks created by waiting for locks while
+ // having generated oplog holes.
if (!skipWritesToOplog) {
- AutoGetOplog autoOplog(opCtx, OplogAccessMode::kWrite);
- auto& oplogColl = autoOplog.getCollection();
+ autoOplog.emplace(opCtx, OplogAccessMode::kWrite);
+ }
+ const bool changeCollectionsMode =
+ change_stream_serverless_helpers::isChangeCollectionsModeActive();
+ if (changeCollectionsMode) {
+ changeCollectionWriter = boost::make_optional(
+ ChangeStreamChangeCollectionManager::get(opCtx).createChangeCollectionsWriter(
+ opCtx, begin, end, nullptr /* opDebug */));
+ changeCollectionWriter->acquireLocks();
+ }
+
+ // Write entries to the oplog.
+ if (!skipWritesToOplog) {
+ auto& oplogColl = autoOplog->getCollection();
if (!oplogColl) {
return {ErrorCodes::NamespaceNotFound, "Oplog collection does not exist"};
}
-
auto status = collection_internal::insertDocuments(
opCtx, oplogColl, begin, end, nullptr /* OpDebug */, false /* fromMigrate */);
if (!status.isOK()) {
@@ -157,14 +174,8 @@ Status _insertDocumentsToOplogAndChangeCollections(
// Write the corresponding oplog entries to tenants respective change
// collections in the serverless.
- if (change_stream_serverless_helpers::isChangeCollectionsModeActive()) {
- auto status =
- ChangeStreamChangeCollectionManager::get(opCtx).insertDocumentsToChangeCollection(
- opCtx,
- begin,
- end,
- !skipWritesToOplog /* hasAcquiredGlobalIXLock */,
- nullptr /* OpDebug */);
+ if (changeCollectionsMode) {
+ auto status = changeCollectionWriter->write();
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 72c22005f92..0ea730b6a35 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
@@ -868,6 +869,20 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
"op"_attr = redact(noopEntry.toBSON()));
AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite);
+ boost::optional<Lock::TenantLock> tenantLock;
+ boost::optional<TenantId> tenantId = [&]() -> boost::optional<TenantId> {
+ if (_tenantId) {
+ return TenantId{OID::createFromString(*_tenantId)};
+ }
+ if (entry.getTid()) {
+ return *entry.getTid();
+ }
+ return boost::none;
+ }();
+ if (tenantId) {
+ tenantLock.emplace(opCtx.get(), *tenantId, MODE_IX);
+ }
+
writeConflictRetry(
opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] {
WriteUnitOfWork wuow(opCtx.get());
@@ -914,6 +929,8 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver,
opCtx->setAlwaysInterruptAtStepDownOrUp_UNSAFE();
AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite);
+ auto tenantLocks = _acquireIntentExclusiveTenantLocks(opCtx.get(), begin, end);
+
writeConflictRetry(
opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] {
WriteUnitOfWork wuow(opCtx.get());
@@ -945,7 +962,34 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver,
wuow.commit();
});
}
-
+std::vector<Lock::TenantLock> TenantOplogApplier::_acquireIntentExclusiveTenantLocks(
+ OperationContext* opCtx,
+ std::vector<TenantNoOpEntry>::const_iterator entryBegin,
+ std::vector<TenantNoOpEntry>::const_iterator entryEnd) const {
+ // Determine all involved tenants.
+ std::set<TenantId> tenantIds = [&] {
+ std::set<TenantId> tenantIds;
+ if (_tenantId) {
+ tenantIds.emplace(OID::createFromString(*_tenantId));
+ } else {
+ for (auto iter = entryBegin; iter != entryEnd; ++iter) {
+ const auto& oplogEntry = *iter->first;
+ if (oplogEntry.getTid()) {
+ tenantIds.insert(*oplogEntry.getTid());
+ }
+ }
+ }
+ return tenantIds;
+ }();
+
+ // Acquire a lock for each tenant.
+ std::vector<Lock::TenantLock> tenantLocks;
+ tenantLocks.reserve(tenantIds.size());
+ for (auto&& tenantId : tenantIds) {
+ tenantLocks.emplace_back(opCtx, tenantId, MODE_IX);
+ }
+ return tenantLocks;
+}
std::vector<std::vector<ApplierOperation>> TenantOplogApplier::_fillWriterVectors(
OperationContext* opCtx, TenantOplogBatch* batch) {
std::vector<std::vector<ApplierOperation>> writerVectors(
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index c4c218b7289..afd339c99d8 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -32,6 +32,7 @@
#include <string>
#include <vector>
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/repl/abstract_async_component.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_buffer.h"
@@ -143,6 +144,15 @@ private:
TenantOplogBatch* batch);
/**
+ * Acquires Intent Exclusive (IX) lock for each tenant referred to by oplog entries [entryBegin;
+ * entryEnd) and returns lock objects.
+ */
+ std::vector<Lock::TenantLock> _acquireIntentExclusiveTenantLocks(
+ OperationContext* opCtx,
+ std::vector<TenantNoOpEntry>::const_iterator entryBegin,
+ std::vector<TenantNoOpEntry>::const_iterator entryEnd) const;
+
+ /**
* Sets the _finalStatus to the new status if and only if the old status is "OK".
*/
void _setFinalStatusIfOk(WithLock, Status newStatus);
diff --git a/src/mongo/db/shard_role_test.cpp b/src/mongo/db/shard_role_test.cpp
index 18332bc3fb6..c4c9772b842 100644
--- a/src/mongo/db/shard_role_test.cpp
+++ b/src/mongo/db/shard_role_test.cpp
@@ -62,7 +62,7 @@ void createTestView(OperationContext* opCtx,
void installDatabaseMetadata(OperationContext* opCtx,
const DatabaseName& dbName,
const DatabaseVersion& dbVersion) {
- AutoGetDb autoDb(opCtx, dbName, MODE_X, {});
+ AutoGetDb autoDb(opCtx, dbName, MODE_X, {}, {});
auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquireExclusive(opCtx, dbName);
scopedDss->setDbInfo(opCtx, {dbName.db(), ShardId("this"), dbVersion});
}
@@ -306,7 +306,7 @@ TEST_F(ShardRoleTest, AcquireUnshardedCollWithIncorrectPlacementVersionThrows) {
TEST_F(ShardRoleTest, AcquireUnshardedCollWhenShardDoesNotKnowThePlacementVersionThrows) {
{
// Clear the database metadata
- AutoGetDb autoDb(opCtx(), dbNameTestDb, MODE_X, {});
+ AutoGetDb autoDb(opCtx(), dbNameTestDb, MODE_X, {}, {});
auto scopedDss =
DatabaseShardingState::assertDbLockedAndAcquireExclusive(opCtx(), dbNameTestDb);
scopedDss->clearDbInfo(opCtx());
@@ -333,7 +333,7 @@ TEST_F(ShardRoleTest, AcquireUnshardedCollWhenCriticalSectionIsActiveThrows) {
const BSONObj criticalSectionReason = BSON("reason" << 1);
{
// Enter critical section.
- AutoGetDb autoDb(opCtx(), dbNameTestDb, MODE_X, {});
+ AutoGetDb autoDb(opCtx(), dbNameTestDb, MODE_X, {}, {});
auto scopedDss =
DatabaseShardingState::assertDbLockedAndAcquireExclusive(opCtx(), dbNameTestDb);
scopedDss->enterCriticalSectionCatchUpPhase(opCtx(), criticalSectionReason);
@@ -361,7 +361,7 @@ TEST_F(ShardRoleTest, AcquireUnshardedCollWhenCriticalSectionIsActiveThrows) {
{
// Exit critical section.
- AutoGetDb autoDb(opCtx(), dbNameTestDb, MODE_X, {});
+ AutoGetDb autoDb(opCtx(), dbNameTestDb, MODE_X, {}, {});
const BSONObj criticalSectionReason = BSON("reason" << 1);
auto scopedDss =
DatabaseShardingState::assertDbLockedAndAcquireExclusive(opCtx(), dbNameTestDb);