summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenrik Edin <henrik.edin@mongodb.com>2022-12-14 15:58:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-14 17:06:57 +0000
commit22adda7d3824aef96e197606b02714bd6e02c91b (patch)
tree8ddb2a77ce1b6b2c26f5ee90de11ab03cb6342de
parentb379a035a5a62f006350b13912999fab36330d9d (diff)
downloadmongo-22adda7d3824aef96e197606b02714bd6e02c91b.tar.gz
SERVER-71287 Stash CollectionCatalog when starting multi-document transactions
This allows readers to use Collections fully in sync with the multi-document transaction snapshot and eliminates the need for the pessimistic "catalog conflicting timestamp". Write operations checks that they are operating on the latest version of the collection and throw WCE if not.
-rw-r--r--jstests/core/txns/concurrent_drops_and_creates.js65
-rw-r--r--jstests/core/txns/create_collection_parallel.js22
-rw-r--r--jstests/core/txns/create_indexes_parallel.js27
-rw-r--r--jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js83
-rw-r--r--jstests/noPassthrough/transaction_write_with_snapshot_unavailable.js12
-rw-r--r--src/mongo/db/catalog/collection_catalog.cpp34
-rw-r--r--src/mongo/db/catalog/collection_catalog.h23
-rw-r--r--src/mongo/db/catalog/collection_writer_test.cpp3
-rw-r--r--src/mongo/db/catalog_raii.cpp58
-rw-r--r--src/mongo/db/catalog_raii.h17
-rw-r--r--src/mongo/db/db_raii.cpp3
-rw-r--r--src/mongo/db/index/index_access_method.cpp34
-rw-r--r--src/mongo/db/transaction/transaction_participant.cpp37
13 files changed, 325 insertions, 93 deletions
diff --git a/jstests/core/txns/concurrent_drops_and_creates.js b/jstests/core/txns/concurrent_drops_and_creates.js
index 8117148ba7e..629aa6d12fb 100644
--- a/jstests/core/txns/concurrent_drops_and_creates.js
+++ b/jstests/core/txns/concurrent_drops_and_creates.js
@@ -16,6 +16,7 @@
// TODO (SERVER-39704): Remove the following load after SERVER-397074 is completed
// For retryOnceOnTransientAndRestartTxnOnMongos.
load('jstests/libs/auto_retry_transaction_in_sharding.js');
+load("jstests/libs/feature_flag_util.js");
const dbName1 = "test1";
const dbName2 = "test2";
@@ -62,18 +63,34 @@ retryOnceOnTransientAndRestartTxnOnMongos(session, () => {
sessionOutsideTxn.advanceClusterTime(session.getClusterTime());
assert.commandWorked(testDB2.runCommand({drop: collNameB, writeConcern: {w: "majority"}}));
-// Ensure the collection drop is visible to the transaction, since our implementation of the in-
-// memory collection catalog always has the most recent collection metadata. We can detect the
-// drop by attempting a findAndModify on the dropped collection. Since the collection drop is
-// visible, the findAndModify will not match any existing documents.
-// TODO (SERVER-39704): Remove use of retryOnceOnTransientAndRestartTxnOnMongos.
-retryOnceOnTransientAndRestartTxnOnMongos(session, () => {
- const res = sessionDB2.runCommand(
- {findAndModify: sessionCollB.getName(), update: {a: 1}, upsert: true});
- assert.commandWorked(res);
- assert.eq(res.value, null);
-}, txnOptions);
-assert.commandWorked(session.commitTransaction_forTesting());
+// This test cause a StaleConfig error on sharding so even with the PointInTimeCatalogLookups flag
+// enabled no command will succeed.
+if (FeatureFlagUtil.isEnabled(db, "PointInTimeCatalogLookups") && !session.getClient().isMongos()) {
+ // We can perform reads on the dropped collection as it existed when we started the transaction.
+ assert.commandWorked(sessionDB2.runCommand({find: sessionCollB.getName()}));
+
+ // However, trying to perform a write will cause a write conflict.
+ assert.commandFailedWithCode(
+ sessionDB2.runCommand(
+ {findAndModify: sessionCollB.getName(), update: {a: 1}, upsert: true}),
+ ErrorCodes.WriteConflict);
+
+ assert.commandFailedWithCode(session.abortTransaction_forTesting(),
+ ErrorCodes.NoSuchTransaction);
+} else {
+ // Ensure the collection drop is visible to the transaction, since our implementation of the in-
+ // memory collection catalog always has the most recent collection metadata. We can detect the
+ // drop by attempting a findAndModify on the dropped collection. Since the collection drop is
+ // visible, the findAndModify will not match any existing documents.
+ // TODO (SERVER-39704): Remove use of retryOnceOnTransientAndRestartTxnOnMongos.
+ retryOnceOnTransientAndRestartTxnOnMongos(session, () => {
+ const res = sessionDB2.runCommand(
+ {findAndModify: sessionCollB.getName(), update: {a: 1}, upsert: true});
+ assert.commandWorked(res);
+ assert.eq(res.value, null);
+ }, txnOptions);
+ assert.commandWorked(session.commitTransaction_forTesting());
+}
//
// A transaction with snapshot read concern cannot write to a collection that has been created
@@ -93,12 +110,24 @@ assert.commandWorked(sessionCollA.insert({}));
sessionOutsideTxn.advanceClusterTime(session.getClusterTime());
assert.commandWorked(testDB2.runCommand({create: collNameB}));
-// We cannot write to collection B in the transaction, since it experienced catalog changes
-// since the transaction's read timestamp. Since our implementation of the in-memory collection
-// catalog always has the most recent collection metadata, we do not allow you to read from a
-// collection at a time prior to its most recent catalog changes.
-assert.commandFailedWithCode(sessionCollB.insert({}), ErrorCodes.SnapshotUnavailable);
-assert.commandFailedWithCode(session.abortTransaction_forTesting(), ErrorCodes.NoSuchTransaction);
+if (FeatureFlagUtil.isEnabled(db, "PointInTimeCatalogLookups")) {
+ // We can insert to collection B in the transaction as the transaction does not have a
+ // collection on this namespace (even as it exist at latest). A collection will be implicitly
+ // created and we will fail to commit this transaction with a WriteConflict error.
+ retryOnceOnTransientAndRestartTxnOnMongos(session, () => {
+ assert.commandWorked(sessionCollB.insert({}));
+ }, txnOptions);
+
+ assert.commandFailedWithCode(session.commitTransaction_forTesting(), ErrorCodes.WriteConflict);
+} else {
+ // We cannot write to collection B in the transaction, since it experienced catalog changes
+ // since the transaction's read timestamp. Since our implementation of the in-memory collection
+ // catalog always has the most recent collection metadata, we do not allow you to read from a
+ // collection at a time prior to its most recent catalog changes.
+ assert.commandFailedWithCode(sessionCollB.insert({}), ErrorCodes.SnapshotUnavailable);
+ assert.commandFailedWithCode(session.abortTransaction_forTesting(),
+ ErrorCodes.NoSuchTransaction);
+}
session.endSession();
}());
diff --git a/jstests/core/txns/create_collection_parallel.js b/jstests/core/txns/create_collection_parallel.js
index 67d72d29a54..453df4ff30b 100644
--- a/jstests/core/txns/create_collection_parallel.js
+++ b/jstests/core/txns/create_collection_parallel.js
@@ -12,6 +12,7 @@
load("jstests/libs/create_collection_txn_helpers.js");
load("jstests/libs/auto_retry_transaction_in_sharding.js");
+load("jstests/libs/feature_flag_util.js");
const dbName = 'test_txns_create_collection_parallel';
@@ -65,11 +66,22 @@ function runParallelCollectionCreateTest(command, explicitCreate) {
session.commitTransaction();
assert.eq(sessionColl.find({}).itcount(), 1);
- assert.commandFailedWithCode(secondSessionDB.runCommand({create: collName}),
- ErrorCodes.NamespaceExists);
-
- assert.commandFailedWithCode(secondSession.abortTransaction_forTesting(),
- ErrorCodes.NoSuchTransaction);
+ if (FeatureFlagUtil.isEnabled(db, "PointInTimeCatalogLookups")) {
+ // create cannot observe the collection created in the other transaction so the command
+ // will succeed and we will instead throw WCE when trying to commit the transaction.
+ retryOnceOnTransientAndRestartTxnOnMongos(secondSession, () => {
+ assert.commandWorked(secondSessionDB.runCommand({create: collName}));
+ }, {writeConcern: {w: "majority"}});
+
+ assert.commandFailedWithCode(secondSession.commitTransaction_forTesting(),
+ ErrorCodes.WriteConflict);
+ } else {
+ assert.commandFailedWithCode(secondSessionDB.runCommand({create: collName}),
+ ErrorCodes.NamespaceExists);
+
+ assert.commandFailedWithCode(secondSession.abortTransaction_forTesting(),
+ ErrorCodes.NoSuchTransaction);
+ }
assert.eq(distinctSessionColl.find({}).itcount(), 0);
sessionColl.drop({writeConcern: {w: "majority"}});
diff --git a/jstests/core/txns/create_indexes_parallel.js b/jstests/core/txns/create_indexes_parallel.js
index 1b8d5247969..2441a5df720 100644
--- a/jstests/core/txns/create_indexes_parallel.js
+++ b/jstests/core/txns/create_indexes_parallel.js
@@ -12,6 +12,7 @@
load("jstests/libs/auto_retry_transaction_in_sharding.js");
load("jstests/libs/create_index_txn_helpers.js");
+load("jstests/libs/feature_flag_util.js");
let doParallelCreateIndexesTest = function(explicitCollectionCreate, multikeyIndex) {
const dbName = 'test_txns_create_indexes_parallel';
@@ -92,13 +93,25 @@ let doParallelCreateIndexesTest = function(explicitCollectionCreate, multikeyInd
assert.eq(secondSessionColl.find({}).itcount(), 1);
assert.eq(secondSessionColl.getIndexes().length, 2);
- // createIndexes takes minimum visible snapshots of new collections into consideration when
- // checking for existing indexes.
- assert.commandFailedWithCode(
- sessionColl.runCommand({createIndexes: collName, indexes: [conflictingIndexSpecs]}),
- ErrorCodes.SnapshotUnavailable);
- assert.commandFailedWithCode(session.abortTransaction_forTesting(),
- ErrorCodes.NoSuchTransaction);
+ if (FeatureFlagUtil.isEnabled(db, "PointInTimeCatalogLookups")) {
+ // createIndexes cannot observe the index created in the other transaction so the command
+ // will succeed and we will instead throw WCE when trying to commit the transaction.
+ retryOnceOnTransientAndRestartTxnOnMongos(session, () => {
+ assert.commandWorked(sessionColl.runCommand(
+ {createIndexes: collName, indexes: [conflictingIndexSpecs]}));
+ }, {writeConcern: {w: "majority"}});
+
+ assert.commandFailedWithCode(session.commitTransaction_forTesting(),
+ ErrorCodes.WriteConflict);
+ } else {
+ // createIndexes takes minimum visible snapshots of new collections into consideration when
+ // checking for existing indexes.
+ assert.commandFailedWithCode(
+ sessionColl.runCommand({createIndexes: collName, indexes: [conflictingIndexSpecs]}),
+ ErrorCodes.SnapshotUnavailable);
+ assert.commandFailedWithCode(session.abortTransaction_forTesting(),
+ ErrorCodes.NoSuchTransaction);
+ }
assert.eq(sessionColl.find({}).itcount(), 1);
assert.eq(sessionColl.getIndexes().length, 2);
diff --git a/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js b/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js
index a08c3b9f041..43d1bd4a814 100644
--- a/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js
+++ b/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js
@@ -4,7 +4,8 @@
(function() {
"use strict";
-load("jstests/libs/curop_helpers.js"); // For waitForCurOpByFailPoint().
+load("jstests/libs/curop_helpers.js"); // For waitForCurOpByFailPoint().
+load("jstests/libs/feature_flag_util.js"); // For FeatureFlagUtil.isEnabled().
const kDbName = "test";
const kCollName = "coll";
@@ -17,7 +18,30 @@ const testDB = rst.getPrimary().getDB(kDbName);
const adminDB = testDB.getSiblingDB("admin");
const coll = testDB.getCollection(kCollName);
-function testCommand(cmd, curOpFilter) {
+function execCommand(cmd, expectSucceed) {
+ if (expectSucceed) {
+ return startParallelShell(
+ "const session = db.getMongo().startSession();" +
+ "const sessionDb = session.getDatabase('test');" +
+ "session.startTransaction({readConcern: {level: 'snapshot'}});" +
+ "const res = sessionDb.runCommand(" + tojson(cmd) + ");" +
+ "assert.commandWorked(res);" +
+ "session.endSession();",
+ rst.ports[0]);
+ } else {
+ return startParallelShell(
+ "const session = db.getMongo().startSession();" +
+ "const sessionDb = session.getDatabase('test');" +
+ "session.startTransaction({readConcern: {level: 'snapshot'}});" +
+ "const res = sessionDb.runCommand(" + tojson(cmd) + ");" +
+ "assert.commandFailedWithCode(res, [ErrorCodes.SnapshotUnavailable, ErrorCodes.WriteConflict]);" +
+ "assert.eq(res.errorLabels, ['TransientTransactionError']);" +
+ "session.endSession();",
+ rst.ports[0]);
+ }
+}
+
+function testCommand(cmd, curOpFilter, expectSucceed) {
coll.drop({writeConcern: {w: "majority"}});
assert.commandWorked(coll.insert({x: 1}, {writeConcern: {w: "majority"}}));
@@ -26,15 +50,10 @@ function testCommand(cmd, curOpFilter) {
assert.commandWorked(testDB.adminCommand(
{configureFailPoint: "hangAfterPreallocateSnapshot", mode: "alwaysOn"}));
- const awaitCommand = startParallelShell(
- "const session = db.getMongo().startSession();" +
- "const sessionDb = session.getDatabase('test');" +
- "session.startTransaction({readConcern: {level: 'snapshot'}});" +
- "const res = sessionDb.runCommand(" + tojson(cmd) + ");" +
- "assert.commandFailedWithCode(res, ErrorCodes.SnapshotUnavailable);" +
- "assert.eq(res.errorLabels, ['TransientTransactionError']);" +
- "session.endSession();",
- rst.ports[0]);
+ // Execute command in parallel shell. Read commands should work even if catalog changes has
+ // occured since opening the snapshot.
+ expectSucceed = expectSucceed && FeatureFlagUtil.isEnabled(testDB, "PointInTimeCatalogLookups");
+ const awaitCommand = execCommand(cmd, expectSucceed);
waitForCurOpByFailPointNoNS(testDB, "hangAfterPreallocateSnapshot", curOpFilter);
@@ -62,27 +81,37 @@ function testCommand(cmd, curOpFilter) {
}
testCommand({aggregate: kCollName, pipeline: [], cursor: {}},
- {"command.aggregate": kCollName, "command.readConcern.level": "snapshot"});
+ {"command.aggregate": kCollName, "command.readConcern.level": "snapshot"},
+ true /*read is expected to succeed*/);
testCommand({delete: kCollName, deletes: [{q: {x: 1}, limit: 1}]},
- {"command.delete": kCollName, "command.readConcern.level": "snapshot"});
+ {"command.delete": kCollName, "command.readConcern.level": "snapshot"},
+ false /*write is expected to fail*/);
testCommand({distinct: kCollName, key: "x"},
- {"command.distinct": kCollName, "command.readConcern.level": "snapshot"});
+ {"command.distinct": kCollName, "command.readConcern.level": "snapshot"},
+ true /*read is expected to succeed*/);
testCommand({find: kCollName},
- {"command.find": kCollName, "command.readConcern.level": "snapshot"});
-testCommand({findAndModify: kCollName, query: {x: 1}, remove: true}, {
- "command.findAndModify": kCollName,
- "command.remove": true,
- "command.readConcern.level": "snapshot"
-});
-testCommand({findAndModify: kCollName, query: {x: 1}, update: {$set: {x: 2}}}, {
- "command.findAndModify": kCollName,
- "command.update.$set": {x: 2},
- "command.readConcern.level": "snapshot"
-});
+ {"command.find": kCollName, "command.readConcern.level": "snapshot"},
+ true /*read is expected to succeed*/);
+testCommand({findAndModify: kCollName, query: {x: 1}, remove: true},
+ {
+ "command.findAndModify": kCollName,
+ "command.remove": true,
+ "command.readConcern.level": "snapshot"
+ },
+ false /*write is expected to fail*/);
+testCommand({findAndModify: kCollName, query: {x: 1}, update: {$set: {x: 2}}},
+ {
+ "command.findAndModify": kCollName,
+ "command.update.$set": {x: 2},
+ "command.readConcern.level": "snapshot"
+ },
+ false /*write is expected to fail*/);
testCommand({insert: kCollName, documents: [{x: 1}]},
- {"command.insert": kCollName, "command.readConcern.level": "snapshot"});
+ {"command.insert": kCollName, "command.readConcern.level": "snapshot"},
+ false /*write is expected to fail*/);
testCommand({update: kCollName, updates: [{q: {x: 1}, u: {$set: {x: 2}}}]},
- {"command.update": kCollName, "command.readConcern.level": "snapshot"});
+ {"command.update": kCollName, "command.readConcern.level": "snapshot"},
+ false /*write is expected to fail*/);
rst.stopSet();
})();
diff --git a/jstests/noPassthrough/transaction_write_with_snapshot_unavailable.js b/jstests/noPassthrough/transaction_write_with_snapshot_unavailable.js
index b1827e8f922..20bf20a55ca 100644
--- a/jstests/noPassthrough/transaction_write_with_snapshot_unavailable.js
+++ b/jstests/noPassthrough/transaction_write_with_snapshot_unavailable.js
@@ -11,6 +11,8 @@
(function() {
"use strict";
+load("jstests/libs/feature_flag_util.js");
+
const name = "transaction_write_with_snapshot_unavailable";
const replTest = new ReplSetTest({name: name, nodes: 1});
replTest.startSet();
@@ -24,6 +26,13 @@ const collNameB = collName + "B";
const primary = replTest.getPrimary();
const primaryDB = primary.getDB(dbName);
+if (FeatureFlagUtil.isEnabled(primaryDB, "PointInTimeCatalogLookups")) {
+ // With the PointInTimeCatalogLookups feature this test doesn't make sense as the
+ // SnapshotUnavailable error will be removed
+ replTest.stopSet();
+ return;
+}
+
assert.commandWorked(primaryDB[collName].insertOne({}, {writeConcern: {w: "majority"}}));
function testOp(cmd) {
@@ -31,8 +40,7 @@ function testOp(cmd) {
let session = primary.startSession();
let sessionDB = session.getDatabase(name);
- jsTestLog(
- `Testing that SnapshotUnavailable during ${op} is labelled TransientTransactionError`);
+ jsTestLog(`Testing that WriteConflict during ${op} is labelled TransientTransactionError`);
session.startTransaction({readConcern: {level: "snapshot"}});
assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{}]}));
diff --git a/src/mongo/db/catalog/collection_catalog.cpp b/src/mongo/db/catalog/collection_catalog.cpp
index 6eb5de1cb84..05c98e94a72 100644
--- a/src/mongo/db/catalog/collection_catalog.cpp
+++ b/src/mongo/db/catalog/collection_catalog.cpp
@@ -447,6 +447,14 @@ std::shared_ptr<const CollectionCatalog> CollectionCatalog::latest(ServiceContex
}
std::shared_ptr<const CollectionCatalog> CollectionCatalog::get(OperationContext* opCtx) {
+ const auto& stashed = stashedCatalog(opCtx->recoveryUnit()->getSnapshot());
+ if (stashed)
+ return stashed;
+
+ return latest(opCtx);
+}
+
+std::shared_ptr<const CollectionCatalog> CollectionCatalog::latest(OperationContext* opCtx) {
// If there is a batched catalog write ongoing and we are the one doing it return this instance
// so we can observe our own writes. There may be other callers that reads the CollectionCatalog
// without any locks, they must see the immutable regular instance.
@@ -454,9 +462,6 @@ std::shared_ptr<const CollectionCatalog> CollectionCatalog::get(OperationContext
return batchedCatalogWriteInstance;
}
- const auto& stashed = stashedCatalog(opCtx->recoveryUnit()->getSnapshot());
- if (stashed)
- return stashed;
return latest(opCtx->getServiceContext());
}
@@ -1329,6 +1334,29 @@ boost::optional<UUID> CollectionCatalog::lookupUUIDByNSS(OperationContext* opCtx
return boost::none;
}
+bool CollectionCatalog::containsCollection(OperationContext* opCtx,
+ const CollectionPtr& collection) const {
+ // Any writable Collection instance created under MODE_X lock is considered to belong to this
+ // catalog instance
+ auto& uncommittedCatalogUpdates = UncommittedCatalogUpdates::get(opCtx);
+ const auto& entries = uncommittedCatalogUpdates.entries();
+ auto entriesIt = std::find_if(entries.begin(),
+ entries.end(),
+ [&collection](const UncommittedCatalogUpdates::Entry& entry) {
+ return entry.collection.get() == collection.get();
+ });
+ if (entriesIt != entries.end() &&
+ entriesIt->action != UncommittedCatalogUpdates::Entry::Action::kOpenedCollection)
+ return true;
+
+ // Verify that we store the same instance in this catalog
+ auto it = _catalog.find(collection->uuid());
+ if (it == _catalog.end())
+ return false;
+
+ return it->second.get() == collection.get();
+}
+
CollectionCatalog::CatalogIdLookup CollectionCatalog::lookupCatalogIdByNSS(
const NamespaceString& nss, boost::optional<Timestamp> ts) const {
if (auto it = _catalogIds.find(nss); it != _catalogIds.end()) {
diff --git a/src/mongo/db/catalog/collection_catalog.h b/src/mongo/db/catalog/collection_catalog.h
index f6477e9ae28..0d60c929409 100644
--- a/src/mongo/db/catalog/collection_catalog.h
+++ b/src/mongo/db/catalog/collection_catalog.h
@@ -114,13 +114,25 @@ public:
};
/**
- * Returns a CollectionCatalog instance. Normally returns the latest stored instance but returns
- * the stashed or batch write instance if set for this operation/snapshot.
+ * Returns a CollectionCatalog instance capable of returning Collection instances consistent
+ * with the storage snapshot. Is the same as latest() below if no snapshot is opened.
+ *
+ * Is the default method of acquiring a CollectionCatalog instance.
*/
static std::shared_ptr<const CollectionCatalog> get(OperationContext* opCtx);
/**
- * Returns the latest stored CollectionCatalog instance. Bypasses stashing and batched writing.
+ * Returns a CollectionCatalog instance that reflects the latest state of the server.
+ *
+ * Used to confirm whether Collection instances are write eligiable.
+ */
+ static std::shared_ptr<const CollectionCatalog> latest(OperationContext* opCtx);
+
+ /**
+ * Like latest() above.
+ *
+ * Bypasses batched writing and should not be used in a context where there might be an ongoing
+ * batched write.
*/
static std::shared_ptr<const CollectionCatalog> latest(ServiceContext* svcCtx);
@@ -403,6 +415,11 @@ public:
const NamespaceString& nss) const;
/**
+ * Returns true if this CollectionCatalog contains the provided collection instance
+ */
+ bool containsCollection(OperationContext* opCtx, const CollectionPtr& collection) const;
+
+ /**
* Returns the CatalogId for a given 'nss' at timestamp 'ts'.
*/
struct CatalogIdLookup {
diff --git a/src/mongo/db/catalog/collection_writer_test.cpp b/src/mongo/db/catalog/collection_writer_test.cpp
index 6dbe940a28c..d5c7b73da3e 100644
--- a/src/mongo/db/catalog/collection_writer_test.cpp
+++ b/src/mongo/db/catalog/collection_writer_test.cpp
@@ -59,8 +59,9 @@ protected:
std::shared_ptr<Collection> collection = std::make_shared<CollectionMock>(kNss);
CollectionCatalog::write(getServiceContext(), [&](CollectionCatalog& catalog) {
+ auto uuid = collection->uuid();
catalog.registerCollection(
- operationContext(), UUID::gen(), std::move(collection), /*ts=*/boost::none);
+ operationContext(), uuid, std::move(collection), /*ts=*/boost::none);
});
}
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp
index d3d8b01ff95..59a5c3389d8 100644
--- a/src/mongo/db/catalog_raii.cpp
+++ b/src/mongo/db/catalog_raii.cpp
@@ -33,10 +33,12 @@
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/catalog/collection_uuid_mismatch.h"
#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/repl/collection_utils.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@@ -56,7 +58,8 @@ void verifyDbAndCollection(OperationContext* opCtx,
const NamespaceStringOrUUID& nsOrUUID,
const NamespaceString& resolvedNss,
CollectionPtr& coll,
- Database* db) {
+ Database* db,
+ bool verifyWriteEligible) {
invariant(!nsOrUUID.uuid() || coll,
str::stream() << "Collection for " << resolvedNss.ns()
<< " disappeared after successfully resolving " << nsOrUUID.toString());
@@ -78,9 +81,31 @@ void verifyDbAndCollection(OperationContext* opCtx,
return;
}
- // If we are in a transaction, we cannot yield and wait when there are pending catalog changes.
- // Instead, we must return an error in such situations. We ignore this restriction for the
- // oplog, since it never has pending catalog changes.
+ // Verify that we are using the latest instance if we intend to perform writes.
+ if (feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV() && verifyWriteEligible) {
+ auto latest = CollectionCatalog::latest(opCtx);
+ if (!latest->containsCollection(opCtx, coll)) {
+ throwWriteConflictException(str::stream()
+ << "Unable to write to collection '" << coll->ns()
+ << "' due to catalog changes; please "
+ "retry the operation");
+ }
+ if (opCtx->recoveryUnit()->isActive()) {
+ const auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx);
+ if (mySnapshot && *mySnapshot < coll->getMinimumValidSnapshot()) {
+ throwWriteConflictException(str::stream()
+ << "Unable to write to collection '" << coll->ns()
+ << "' due to snapshot timestamp " << *mySnapshot
+ << " being older than collection minimum "
+ << *coll->getMinimumValidSnapshot()
+ << "; please retry the operation");
+ }
+ }
+ }
+
+ // If we are in a transaction, we cannot yield and wait when there are pending catalog
+ // changes. Instead, we must return an error in such situations. We ignore this restriction
+ // for the oplog, since it never has pending catalog changes.
if (opCtx->inMultiDocumentTransaction() && resolvedNss != NamespaceString::kRsOplogNamespace) {
if (auto minSnapshot = coll->getMinimumVisibleSnapshot()) {
auto mySnapshot =
@@ -236,6 +261,25 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
const NamespaceStringOrUUID& nsOrUUID,
LockMode modeColl,
Options options)
+ : AutoGetCollection(opCtx,
+ nsOrUUID,
+ modeColl,
+ std::move(options),
+ /*verifyWriteEligible=*/modeColl != MODE_IS) {}
+
+AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
+ const NamespaceStringOrUUID& nsOrUUID,
+ LockMode modeColl,
+ Options options,
+ ForReadTag reader)
+ : AutoGetCollection(
+ opCtx, nsOrUUID, modeColl, std::move(options), /*verifyWriteEligible=*/false) {}
+
+AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
+ const NamespaceStringOrUUID& nsOrUUID,
+ LockMode modeColl,
+ Options options,
+ bool verifyWriteEligible)
: _autoDb([&] {
auto& deadline = options._deadline;
@@ -316,7 +360,8 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
_resolvedNss = catalog->resolveNamespaceStringOrUUID(opCtx, nsOrUUID);
_coll = catalog->lookupCollectionByNamespace(opCtx, _resolvedNss);
checkCollectionUUIDMismatch(opCtx, _resolvedNss, _coll, options._expectedUUID);
- verifyDbAndCollection(opCtx, modeColl, nsOrUUID, _resolvedNss, _coll, _autoDb.getDb());
+ verifyDbAndCollection(
+ opCtx, modeColl, nsOrUUID, _resolvedNss, _coll, _autoDb.getDb(), verifyWriteEligible);
for (auto& secondaryNssOrUUID : secondaryNssOrUUIDs) {
auto secondaryResolvedNss =
catalog->resolveNamespaceStringOrUUID(opCtx, secondaryNssOrUUID);
@@ -328,7 +373,8 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
secondaryNssOrUUID,
secondaryResolvedNss,
secondaryColl,
- databaseHolder->getDb(opCtx, *secondaryDbName));
+ databaseHolder->getDb(opCtx, *secondaryDbName),
+ verifyWriteEligible);
}
if (_coll) {
diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h
index 25ce49f538f..8fef15dd1b6 100644
--- a/src/mongo/db/catalog_raii.h
+++ b/src/mongo/db/catalog_raii.h
@@ -193,6 +193,18 @@ public:
LockMode modeColl,
Options options = {});
+ /**
+ * Special constructor when this class is instantiated from AutoGetCollectionForRead. Used to
+ * indicate that the intent is to perform reads only. We cannot use the LockMode to determine
+ * this as multi-document transactions use MODE_IX for reads.
+ */
+ struct ForReadTag {};
+ AutoGetCollection(OperationContext* opCtx,
+ const NamespaceStringOrUUID& nsOrUUID,
+ LockMode modeColl,
+ Options options,
+ ForReadTag read);
+
AutoGetCollection(AutoGetCollection&&) = default;
explicit operator bool() const {
@@ -257,6 +269,11 @@ public:
Collection* getWritableCollection(OperationContext* opCtx);
protected:
+ AutoGetCollection(OperationContext* opCtx,
+ const NamespaceStringOrUUID& nsOrUUID,
+ LockMode modeColl,
+ Options options,
+ bool verifyWriteEligible);
// Ordering matters, the _collLocks should destruct before the _autoGetDb releases the
// rstl/global/database locks.
AutoGetDb _autoDb;
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index a41023bfbfb..5da919faf79 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -610,7 +610,8 @@ EmplaceAutoGetCollectionForRead::EmplaceAutoGetCollectionForRead(
_options(std::move(options)) {}
void EmplaceAutoGetCollectionForRead::emplace(boost::optional<AutoGetCollection>& autoColl) const {
- autoColl.emplace(_opCtx, _nsOrUUID, _collectionLockMode, _options);
+ autoColl.emplace(
+ _opCtx, _nsOrUUID, _collectionLockMode, _options, AutoGetCollection::ForReadTag{});
}
AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index b0caf54a7c6..6e62bd1eb10 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -1189,14 +1189,16 @@ Status SortedDataIndexAccessMethod::_indexKeysOrWriteToSideTable(
}
} else {
// Ensure that our snapshot is compatible with the index's minimum visibile snapshot.
- const auto minVisibleTimestamp = _indexCatalogEntry->getMinimumVisibleSnapshot();
- const auto readTimestamp =
- opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx).value_or(
- opCtx->recoveryUnit()->getCatalogConflictingTimestamp());
- if (minVisibleTimestamp && !readTimestamp.isNull() &&
- readTimestamp < *minVisibleTimestamp) {
- throwWriteConflictException(
- "Unable to read from a snapshot due to pending catalog changes.");
+ if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) {
+ const auto minVisibleTimestamp = _indexCatalogEntry->getMinimumVisibleSnapshot();
+ const auto readTimestamp =
+ opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx).value_or(
+ opCtx->recoveryUnit()->getCatalogConflictingTimestamp());
+ if (minVisibleTimestamp && !readTimestamp.isNull() &&
+ readTimestamp < *minVisibleTimestamp) {
+ throwWriteConflictException(
+ "Unable to read from a snapshot due to pending catalog changes.");
+ }
}
int64_t numInserted = 0;
@@ -1258,12 +1260,16 @@ void SortedDataIndexAccessMethod::_unindexKeysOrWriteToSideTable(
options.dupsAllowed = options.dupsAllowed || checkRecordId == CheckRecordId::On;
// Ensure that our snapshot is compatible with the index's minimum visibile snapshot.
- const auto minVisibleTimestamp = _indexCatalogEntry->getMinimumVisibleSnapshot();
- const auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx).value_or(
- opCtx->recoveryUnit()->getCatalogConflictingTimestamp());
- if (minVisibleTimestamp && !readTimestamp.isNull() && readTimestamp < *minVisibleTimestamp) {
- throwWriteConflictException(
- "Unable to read from a snapshot due to pending catalog changes.");
+ if (!feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV()) {
+ const auto minVisibleTimestamp = _indexCatalogEntry->getMinimumVisibleSnapshot();
+ const auto readTimestamp =
+ opCtx->recoveryUnit()->getPointInTimeReadTimestamp(opCtx).value_or(
+ opCtx->recoveryUnit()->getCatalogConflictingTimestamp());
+ if (minVisibleTimestamp && !readTimestamp.isNull() &&
+ readTimestamp < *minVisibleTimestamp) {
+ throwWriteConflictException(
+ "Unable to read from a snapshot due to pending catalog changes.");
+ }
}
int64_t removed = 0;
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp
index 4737506e356..2d5b49665d1 100644
--- a/src/mongo/db/transaction/transaction_participant.cpp
+++ b/src/mongo/db/transaction/transaction_participant.cpp
@@ -1088,6 +1088,9 @@ TransactionParticipant::Participant::onConflictingInternalTransactionCompletion(
void TransactionParticipant::Participant::_setReadSnapshot(OperationContext* opCtx,
repl::ReadConcernArgs readConcernArgs) {
+ bool pitLookupFeatureEnabled =
+ feature_flags::gPointInTimeCatalogLookups.isEnabledAndIgnoreFCV();
+
if (readConcernArgs.getArgsAtClusterTime()) {
// Read concern code should have already set the timestamp on the recovery unit.
const auto readTimestamp = readConcernArgs.getArgsAtClusterTime()->asTimestamp();
@@ -1121,7 +1124,7 @@ void TransactionParticipant::Participant::_setReadSnapshot(OperationContext* opC
// _catalogConflictTimestamp. Currently, only oplog application on secondaries can run
// inside a transaction, thus `writesAreReplicated` is a suitable proxy to single out
// transactions on primaries.
- if (opCtx->writesAreReplicated()) {
+ if (!pitLookupFeatureEnabled && opCtx->writesAreReplicated()) {
// Since this snapshot may reflect oplog holes, record the most visible timestamp before
// opening a storage transaction. This timestamp will be used later to detect any
// changes in the catalog after a storage transaction is opened.
@@ -1130,7 +1133,28 @@ void TransactionParticipant::Participant::_setReadSnapshot(OperationContext* opC
}
}
- opCtx->recoveryUnit()->preallocateSnapshot();
+
+ if (pitLookupFeatureEnabled) {
+ // Allocate the snapshot together with a consistent CollectionCatalog instance. As we have
+ // no critical section we use optimistic concurrency control and check that there was no
+ // write to the CollectionCatalog while we allocated the storage snapshot. Stash the catalog
+ // instance so collection lookups within this transaction are consistent with the snapshot.
+ auto catalog = CollectionCatalog::get(opCtx);
+ while (true) {
+ opCtx->recoveryUnit()->preallocateSnapshot();
+ auto after = CollectionCatalog::get(opCtx);
+ if (catalog == after) {
+ // Catalog did not change, break out of the retry loop and use this instance
+ break;
+ }
+ // Catalog change detected, reallocate the snapshot and try again.
+ opCtx->recoveryUnit()->abandonSnapshot();
+ catalog = std::move(after);
+ }
+ CollectionCatalog::stash(opCtx, std::move(catalog));
+ } else {
+ opCtx->recoveryUnit()->preallocateSnapshot();
+ }
}
TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx,
@@ -1520,10 +1544,6 @@ void TransactionParticipant::Participant::unstashTransactionResources(OperationC
invariant(!opCtx->lockState()->isRSTLLocked());
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
- // Stashed transaction resources do not exist for this in-progress multi-document
- // transaction. Set up the transaction resources on the opCtx.
- opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
-
// If maxTransactionLockRequestTimeoutMillis is set, then we will ensure no
// future lock request waits longer than maxTransactionLockRequestTimeoutMillis
// to acquire a lock. This is to avoid deadlocks and minimize non-transaction
@@ -1549,6 +1569,11 @@ void TransactionParticipant::Participant::unstashTransactionResources(OperationC
// This begins the storage transaction and so we do it after acquiring the global lock.
_setReadSnapshot(opCtx, repl::ReadConcernArgs::get(opCtx));
+ // Stashed transaction resources do not exist for this in-progress multi-document transaction.
+ // Set up the transaction resources on the opCtx. Must be done after setting up the read
+ // snapshot.
+ opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
+
// The Client lock must not be held when executing this failpoint as it will block currentOp
// execution.
if (MONGO_unlikely(hangAfterPreallocateSnapshot.shouldFail())) {