summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2023-03-22 15:03:54 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-22 16:26:50 +0000
commit7a62847796d3797278a7b5fb6215948cd325b45d (patch)
tree45c745ec13218ab7229b636d5c60ad24739884b9
parenta87c105f1c0babf189f7df6ba38b16b3f04b3162 (diff)
downloadmongo-7a62847796d3797278a7b5fb6215948cd325b45d.tar.gz
SERVER-73766 Use ScopedCollectionAcquisition in the CollectionBulkLoaderImpl
-rw-r--r--src/mongo/db/catalog_raii.cpp46
-rw-r--r--src/mongo/db/catalog_raii.h17
-rw-r--r--src/mongo/db/commands/SConscript68
-rw-r--r--src/mongo/db/commands/drop_indexes_cmd.cpp (renamed from src/mongo/db/commands/drop_indexes.cpp)0
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp49
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.h5
-rw-r--r--src/mongo/db/shard_role.cpp70
-rw-r--r--src/mongo/db/shard_role.h51
-rw-r--r--src/mongo/db/shard_role_test.cpp120
-rw-r--r--src/mongo/db/storage/recovery_unit.h2
-rw-r--r--src/mongo/db/transaction_resources.h2
11 files changed, 346 insertions, 84 deletions
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp
index d82cc381480..b05d5afd422 100644
--- a/src/mongo/db/catalog_raii.cpp
+++ b/src/mongo/db/catalog_raii.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/shard_role.h"
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@@ -409,7 +410,6 @@ Collection* AutoGetCollection::getWritableCollection(OperationContext* opCtx) {
// Acquire writable instance if not already available
if (!_writableColl) {
-
auto catalog = CollectionCatalog::get(opCtx);
_writableColl = catalog->lookupCollectionByNamespaceForMetadataWrite(opCtx, _resolvedNss);
// Makes the internal CollectionPtr Yieldable and resets the writable Collection when
@@ -541,6 +541,26 @@ struct CollectionWriter::SharedImpl {
std::function<Collection*()> _writableCollectionInitializer;
};
+CollectionWriter::CollectionWriter(OperationContext* opCtx,
+ ScopedCollectionAcquisition* acquisition)
+ : _acquisition(acquisition),
+ _collection(&_storedCollection),
+ _managed(true),
+ _sharedImpl(std::make_shared<SharedImpl>(this)) {
+
+ _storedCollection = CollectionPtr(
+ CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, _acquisition->nss()));
+ _storedCollection.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, _storedCollection));
+
+ _sharedImpl->_writableCollectionInitializer = [this, opCtx]() mutable {
+ invariant(!_fence);
+ _fence = std::make_unique<ScopedLocalCatalogWriteFence>(opCtx, _acquisition);
+
+ return CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForMetadataWrite(
+ opCtx, _acquisition->nss());
+ };
+}
+
CollectionWriter::CollectionWriter(OperationContext* opCtx, const UUID& uuid)
: _collection(&_storedCollection),
_managed(true),
@@ -549,6 +569,7 @@ CollectionWriter::CollectionWriter(OperationContext* opCtx, const UUID& uuid)
_storedCollection =
CollectionPtr(CollectionCatalog::get(opCtx)->lookupCollectionByUUID(opCtx, uuid));
_storedCollection.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, _storedCollection));
+
_sharedImpl->_writableCollectionInitializer = [opCtx, uuid]() {
return CollectionCatalog::get(opCtx)->lookupCollectionByUUIDForMetadataWrite(opCtx, uuid);
};
@@ -558,9 +579,11 @@ CollectionWriter::CollectionWriter(OperationContext* opCtx, const NamespaceStrin
: _collection(&_storedCollection),
_managed(true),
_sharedImpl(std::make_shared<SharedImpl>(this)) {
+
_storedCollection =
CollectionPtr(CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss));
_storedCollection.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, _storedCollection));
+
_sharedImpl->_writableCollectionInitializer = [opCtx, nss]() {
return CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForMetadataWrite(opCtx,
nss);
@@ -571,6 +594,7 @@ CollectionWriter::CollectionWriter(OperationContext* opCtx, AutoGetCollection& a
: _collection(&autoCollection.getCollection()),
_managed(true),
_sharedImpl(std::make_shared<SharedImpl>(this)) {
+
_sharedImpl->_writableCollectionInitializer = [&autoCollection, opCtx]() {
return autoCollection.getWritableCollection(opCtx);
};
@@ -594,8 +618,8 @@ Collection* CollectionWriter::getWritableCollection(OperationContext* opCtx) {
if (!_writableCollection) {
_writableCollection = _sharedImpl->_writableCollectionInitializer();
- // If we are using our stored Collection then we are not managed by an AutoGetCollection and
- // we need to manage lifetime here.
+ // If we are using our stored Collection then we are not managed by an AutoGetCollection
+ // and we need to manage lifetime here.
if (_managed) {
bool usingStoredCollection = *_collection == _storedCollection;
auto rollbackCollection =
@@ -609,6 +633,7 @@ Collection* CollectionWriter::getWritableCollection(OperationContext* opCtx) {
[shared = _sharedImpl](OperationContext* opCtx, boost::optional<Timestamp>) {
if (shared->_parent) {
shared->_parent->_writableCollection = nullptr;
+ shared->_parent->_fence.reset();
// Make the stored collection yieldable again as we now operate with the
// same instance as is in the catalog.
@@ -619,12 +644,15 @@ Collection* CollectionWriter::getWritableCollection(OperationContext* opCtx) {
[shared = _sharedImpl, rollbackCollection = std::move(rollbackCollection)](
OperationContext* opCtx) mutable {
if (shared->_parent) {
- // Restore stored collection to its previous state. The rollback instance is
- // already yieldable.
- shared->_parent->_storedCollection = std::move(rollbackCollection);
shared->_parent->_writableCollection = nullptr;
+ shared->_parent->_fence.reset();
+
+ // Restore stored collection to its previous state. The rollback
+ // instance is already yieldable.
+ shared->_parent->_storedCollection = std::move(rollbackCollection);
}
});
+
if (usingStoredCollection) {
_storedCollection = CollectionPtr(_writableCollection);
}
@@ -641,7 +669,8 @@ ReadSourceScope::ReadSourceScope(OperationContext* opCtx,
RecoveryUnit::ReadSource readSource,
boost::optional<Timestamp> provided)
: _opCtx(opCtx), _originalReadSource(opCtx->recoveryUnit()->getTimestampReadSource()) {
- // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read helper.
+ // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read
+ // helper.
invariant(!_opCtx->isLockFreeReadsOp());
if (_originalReadSource == RecoveryUnit::ReadSource::kProvided) {
@@ -653,7 +682,8 @@ ReadSourceScope::ReadSourceScope(OperationContext* opCtx,
}
ReadSourceScope::~ReadSourceScope() {
- // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read helper.
+ // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read
+ // helper.
invariant(!_opCtx->isLockFreeReadsOp());
_opCtx->recoveryUnit()->abandonSnapshot();
diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h
index b577ca3c5bc..7028c1708eb 100644
--- a/src/mongo/db/catalog_raii.h
+++ b/src/mongo/db/catalog_raii.h
@@ -432,8 +432,20 @@ private:
* It is safe to re-use an instance for multiple WriteUnitOfWorks or to destroy it before the active
* WriteUnitOfWork finishes.
*/
+class ScopedCollectionAcquisition;
+class ScopedLocalCatalogWriteFence;
+
class CollectionWriter final {
public:
+ // This constructor indicates to the shard role subsystem that the subsequent code enteres into
+ // local DDL land and that the content of the local collection should not be trusted until it
+ // goes out of scope.
+ //
+ // See the comments on ScopedCollectionAcquisition for more details.
+ //
+ // TODO (SERVER-73766): Only this constructor should remain in use
+ CollectionWriter(OperationContext* opCtx, ScopedCollectionAcquisition* acquisition);
+
// Gets the collection from the catalog for the provided uuid
CollectionWriter(OperationContext* opCtx, const UUID& uuid);
// Gets the collection from the catalog for the provided namespace string
@@ -474,6 +486,11 @@ public:
Collection* getWritableCollection(OperationContext* opCtx);
private:
+ // This group of values is only operated on for code paths that go through the
+ // `ScopedCollectionAcquisition` constructor.
+ ScopedCollectionAcquisition* _acquisition;
+ std::unique_ptr<ScopedLocalCatalogWriteFence> _fence;
+
// If this class is instantiated with the constructors that take UUID or nss we need somewhere
// to store the CollectionPtr used. But if it is instantiated with an AutoGetCollection then the
// lifetime of the object is managed there. To unify the two code paths we have a pointer that
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 10acc571c27..686b0e17923 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -338,42 +338,42 @@ env.Library(
# Commands that are present in both mongod and embedded
env.Library(
- target="standalone",
- source=[
- "analyze_cmd.cpp",
- "count_cmd.cpp",
- "create_command.cpp",
- "create_indexes_cmd.cpp",
- "current_op.cpp",
- "dbcommands.cpp",
- "distinct.cpp",
- "drop_indexes.cpp",
- "explain_cmd.cpp",
- "find_and_modify.cpp",
- "find_cmd.cpp",
+ target='standalone',
+ source=[
+ 'analyze_cmd.cpp',
+ 'count_cmd.cpp',
+ 'create_command.cpp',
+ 'create_indexes_cmd.cpp',
+ 'current_op.cpp',
+ 'dbcommands.cpp',
+ 'distinct.cpp',
+ 'drop_indexes_cmd.cpp',
+ 'explain_cmd.cpp',
+ 'find_and_modify.cpp',
+ 'find_cmd.cpp',
'fle2_get_count_info_command.cpp',
- "getmore_cmd.cpp",
- "http_client.cpp",
+ 'getmore_cmd.cpp',
+ 'http_client.cpp',
'http_client.idl',
- "index_filter_commands.cpp",
- "kill_op.cpp",
- "killcursors_cmd.cpp",
- "killoperations_cmd.cpp",
- "lock_info.cpp",
- "list_collections.cpp",
- "list_databases.cpp",
+ 'index_filter_commands.cpp',
+ 'kill_op.cpp',
+ 'killcursors_cmd.cpp',
+ 'killoperations_cmd.cpp',
+ 'list_collections.cpp',
+ 'list_databases.cpp',
'list_databases_for_all_tenants.cpp',
- "list_indexes.cpp",
- "pipeline_command.cpp",
- "plan_cache_clear_command.cpp",
- "plan_cache_commands.cpp",
- "rename_collection_cmd.cpp",
- "run_aggregate.cpp",
- "sleep_command.cpp",
- "validate.cpp",
- "validate_db_metadata_cmd.cpp",
- "whats_my_sni_command.cpp",
- "write_commands.cpp",
+ 'list_indexes.cpp',
+ 'lock_info.cpp',
+ 'pipeline_command.cpp',
+ 'plan_cache_clear_command.cpp',
+ 'plan_cache_commands.cpp',
+ 'rename_collection_cmd.cpp',
+ 'run_aggregate.cpp',
+ 'sleep_command.cpp',
+ 'validate.cpp',
+ 'validate_db_metadata_cmd.cpp',
+ 'whats_my_sni_command.cpp',
+ 'write_commands.cpp',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
@@ -421,7 +421,7 @@ env.Library(
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/stats/server_read_concern_write_concern_metrics',
'$BUILD_DIR/mongo/db/storage/storage_engine_common',
- "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl",
+ '$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl',
'$BUILD_DIR/mongo/db/timeseries/catalog_helper',
'$BUILD_DIR/mongo/db/timeseries/timeseries_collmod',
'$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util',
diff --git a/src/mongo/db/commands/drop_indexes.cpp b/src/mongo/db/commands/drop_indexes_cmd.cpp
index a57059e0fc8..a57059e0fc8 100644
--- a/src/mongo/db/commands/drop_indexes.cpp
+++ b/src/mongo/db/commands/drop_indexes_cmd.cpp
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index b3a85d9c372..12ebf0176f7 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -57,7 +57,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(ServiceContext::UniqueClient
const BSONObj& idIndexSpec)
: _client{std::move(client)},
_opCtx{std::move(opCtx)},
- _collection{_opCtx.get(), nss, MODE_X},
+ _acquisition(
+ acquireCollectionForLocalCatalogOnlyWithPotentialDataLoss(_opCtx.get(), nss, MODE_X)),
_nss{nss},
_idIndexBlock(std::make_unique<MultiIndexBlock>()),
_secondaryIndexesBlock(std::make_unique<MultiIndexBlock>()),
@@ -82,14 +83,14 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex
return writeConflictRetry(
_opCtx.get(),
"CollectionBulkLoader::init",
- _collection.getNss().ns(),
+ _acquisition.nss().ns(),
[&secondaryIndexSpecs, this] {
WriteUnitOfWork wuow(_opCtx.get());
// All writes in CollectionBulkLoaderImpl should be unreplicated.
// The opCtx is accessed indirectly through _secondaryIndexesBlock.
UnreplicatedWritesBlock uwb(_opCtx.get());
// This enforces the buildIndexes setting in the replica set configuration.
- CollectionWriter collWriter(_opCtx.get(), _collection);
+ CollectionWriter collWriter(_opCtx.get(), &_acquisition);
auto indexCatalog =
collWriter.getWritableCollection(_opCtx.get())->getIndexCatalog();
auto specs = indexCatalog->removeExistingIndexesNoChecks(
@@ -152,7 +153,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection(
bytesInBlock += doc.objsize();
// This version of insert will not update any indexes.
const auto status = collection_internal::insertDocumentForBulkLoader(
- _opCtx.get(), *_collection, doc, onRecordInserted);
+ _opCtx.get(), _acquisition.getCollectionPtr(), doc, onRecordInserted);
if (!status.isOK()) {
return status;
}
@@ -199,7 +200,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection(
// For capped collections, we use regular insertDocument, which
// will update pre-existing indexes.
const auto status = collection_internal::insertDocument(
- _opCtx.get(), *_collection, InsertStatement(doc), nullptr);
+ _opCtx.get(), _acquisition.getCollectionPtr(), InsertStatement(doc), nullptr);
if (!status.isOK()) {
return status;
}
@@ -238,21 +239,24 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit before deleting dups, so the dups will be removed from secondary indexes when
// deleted.
if (_secondaryIndexesBlock) {
- auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(_opCtx.get(), *_collection);
+ auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(
+ _opCtx.get(), _acquisition.getCollectionPtr());
if (!status.isOK()) {
return status;
}
// This should always return Status::OK() as the foreground index build doesn't install
// an interceptor.
- invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(), *_collection));
+ invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(),
+ _acquisition.getCollectionPtr()));
status = writeConflictRetry(
_opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] {
WriteUnitOfWork wunit(_opCtx.get());
+ CollectionWriter collWriter(_opCtx.get(), &_acquisition);
auto status = _secondaryIndexesBlock->commit(
_opCtx.get(),
- _collection.getWritableCollection(_opCtx.get()),
+ collWriter.getWritableCollection(_opCtx.get()),
MultiIndexBlock::kNoopOnCreateEachFn,
MultiIndexBlock::kNoopOnCommitFn);
if (!status.isOK()) {
@@ -269,20 +273,24 @@ Status CollectionBulkLoaderImpl::commit() {
if (_idIndexBlock) {
// Do not do inside a WriteUnitOfWork (required by dumpInsertsFromBulk).
auto status = _idIndexBlock->dumpInsertsFromBulk(
- _opCtx.get(), *_collection, [&](const RecordId& rid) {
+ _opCtx.get(), _acquisition.getCollectionPtr(), [&](const RecordId& rid) {
writeConflictRetry(
_opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &rid] {
WriteUnitOfWork wunit(_opCtx.get());
- auto doc = _collection->docFor(_opCtx.get(), rid);
+ auto doc = _acquisition.getCollectionPtr()->docFor(_opCtx.get(), rid);
// Delete the document before committing the index. If we were to delete
// the document after committing the index, it's possible that the we
// may unindex a record with the same key but a different RecordId.
- _collection->getRecordStore()->deleteRecord(_opCtx.get(), rid);
-
- auto indexIt = _collection->getIndexCatalog()->getIndexIterator(
- _opCtx.get(), IndexCatalog::InclusionPolicy::kReady);
+ _acquisition.getCollectionPtr()->getRecordStore()->deleteRecord(
+ _opCtx.get(), rid);
+
+ auto indexIt =
+ _acquisition.getCollectionPtr()
+ ->getIndexCatalog()
+ ->getIndexIterator(_opCtx.get(),
+ IndexCatalog::InclusionPolicy::kReady);
while (auto entry = indexIt->next()) {
if (entry->descriptor()->isIdIndex()) {
continue;
@@ -297,7 +305,7 @@ Status CollectionBulkLoaderImpl::commit() {
entry->accessMethod()->remove(
_opCtx.get(),
pooledBuilder,
- *_collection,
+ _acquisition.getCollectionPtr(),
doc.value(),
rid,
false /* logIfError */,
@@ -322,9 +330,10 @@ Status CollectionBulkLoaderImpl::commit() {
status = writeConflictRetry(
_opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] {
WriteUnitOfWork wunit(_opCtx.get());
+ CollectionWriter collWriter(_opCtx.get(), &_acquisition);
auto status =
_idIndexBlock->commit(_opCtx.get(),
- _collection.getWritableCollection(_opCtx.get()),
+ collWriter.getWritableCollection(_opCtx.get()),
MultiIndexBlock::kNoopOnCreateEachFn,
MultiIndexBlock::kNoopOnCommitFn);
if (!status.isOK()) {
@@ -357,14 +366,14 @@ Status CollectionBulkLoaderImpl::commit() {
void CollectionBulkLoaderImpl::_releaseResources() {
invariant(&cc() == _opCtx->getClient());
if (_secondaryIndexesBlock) {
- CollectionWriter collWriter(_opCtx.get(), _collection);
+ CollectionWriter collWriter(_opCtx.get(), &_acquisition);
_secondaryIndexesBlock->abortIndexBuild(
_opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn);
_secondaryIndexesBlock.reset();
}
if (_idIndexBlock) {
- CollectionWriter collWriter(_opCtx.get(), _collection);
+ CollectionWriter collWriter(_opCtx.get(), &_acquisition);
_idIndexBlock->abortIndexBuild(_opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn);
_idIndexBlock.reset();
}
@@ -392,7 +401,7 @@ Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc,
if (_idIndexBlock) {
auto status = _idIndexBlock->insertSingleDocumentForInitialSyncOrRecovery(
_opCtx.get(),
- *_collection,
+ _acquisition.getCollectionPtr(),
doc,
loc,
// This caller / code path does not have cursors to save/restore.
@@ -406,7 +415,7 @@ Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc,
if (_secondaryIndexesBlock) {
auto status = _secondaryIndexesBlock->insertSingleDocumentForInitialSyncOrRecovery(
_opCtx.get(),
- *_collection,
+ _acquisition.getCollectionPtr(),
doc,
loc,
// This caller / code path does not have cursors to save/restore.
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h
index 8e7b53e497a..f52b4983e69 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.h
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.h
@@ -34,10 +34,9 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/catalog/multi_index_block.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/collection_bulk_loader.h"
#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/shard_role.h"
namespace mongo {
namespace repl {
@@ -104,7 +103,7 @@ private:
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;
- AutoGetCollection _collection;
+ ScopedCollectionAcquisition _acquisition;
NamespaceString _nss;
std::unique_ptr<MultiIndexBlock> _idIndexBlock;
std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock;
diff --git a/src/mongo/db/shard_role.cpp b/src/mongo/db/shard_role.cpp
index 5beb67be776..74f11f9e247 100644
--- a/src/mongo/db/shard_role.cpp
+++ b/src/mongo/db/shard_role.cpp
@@ -283,7 +283,7 @@ std::vector<ScopedCollectionOrViewAcquisition> acquireResolvedCollectionsOrViews
prerequisites.uuid = collectionPtr->uuid();
}
- const shard_role_details::AcquiredCollection& acquiredCollection =
+ shard_role_details::AcquiredCollection& acquiredCollection =
getOrMakeTransactionResources(opCtx).addAcquiredCollection(
{prerequisites,
std::move(acquisitionRequest.second.dbLock),
@@ -341,6 +341,13 @@ CollectionAcquisitionRequest CollectionAcquisitionRequest::fromOpCtx(
nss, {oss.getDbVersion(nss.db()), oss.getShardVersion(nss)}, readConcern, operationType);
}
+const UUID& ScopedCollectionAcquisition::uuid() const {
+ invariant(exists(),
+ str::stream() << "Collection " << nss()
+ << " doesn't exist, so its UUID cannot be obtained");
+ return *_acquiredCollection.prerequisites.uuid;
+}
+
const ScopedCollectionDescription& ScopedCollectionAcquisition::getShardingDescription() const {
// The collectionDescription will only not be set if the caller as acquired the acquisition
// using the kLocalCatalogOnlyWithPotentialDataLoss placement concern
@@ -483,6 +490,8 @@ std::vector<ScopedCollectionOrViewAcquisition> acquireCollectionsOrViewsWithoutT
ScopedCollectionAcquisition acquireCollectionForLocalCatalogOnlyWithPotentialDataLoss(
OperationContext* opCtx, const NamespaceString& nss, LockMode mode) {
+ invariant(!OperationShardingState::isComingFromRouter(opCtx));
+
auto& txnResources = getOrMakeTransactionResources(opCtx);
txnResources.assertNoAcquiredCollections();
@@ -501,23 +510,56 @@ ScopedCollectionAcquisition acquireCollectionForLocalCatalogOnlyWithPotentialDat
auto& coll = std::get<CollectionPtr>(collOrView);
- const shard_role_details::AcquiredCollection& acquiredCollection =
- txnResources.addAcquiredCollection(
- {AcquisitionPrerequisites(
- nss,
- coll ? boost::optional<UUID>(coll->uuid()) : boost::none,
- AcquisitionPrerequisites::kLocalCatalogOnlyWithPotentialDataLoss,
- AcquisitionPrerequisites::OperationType::kWrite,
- AcquisitionPrerequisites::ViewMode::kMustBeCollection),
- std::move(dbLock),
- std::move(collLock),
- boost::none,
- boost::none,
- std::move(coll)});
+ shard_role_details::AcquiredCollection& acquiredCollection = txnResources.addAcquiredCollection(
+ {AcquisitionPrerequisites(nss,
+ coll ? boost::optional<UUID>(coll->uuid()) : boost::none,
+ AcquisitionPrerequisites::kLocalCatalogOnlyWithPotentialDataLoss,
+ AcquisitionPrerequisites::OperationType::kWrite,
+ AcquisitionPrerequisites::ViewMode::kMustBeCollection),
+ std::move(dbLock),
+ std::move(collLock),
+ boost::none,
+ boost::none,
+ std::move(coll)});
return ScopedCollectionAcquisition(opCtx, acquiredCollection);
}
+ScopedLocalCatalogWriteFence::ScopedLocalCatalogWriteFence(OperationContext* opCtx,
+ ScopedCollectionAcquisition* acquisition)
+ : _opCtx(opCtx), _acquiredCollection(&acquisition->_acquiredCollection) {
+ // Clear the collectionPtr from the acquisition to indicate that it should not be used until the
+ // caller is done with the DDL modifications
+ _acquiredCollection->collectionPtr = CollectionPtr();
+
+ // OnCommit, there is nothing to do because the caller is not allowed to use the collection in
+ // the scope of the ScopedLocalCatalogWriteFence and the destructor will take care of updating
+ // the acquisition to point to the latest changed value.
+ opCtx->recoveryUnit()->onRollback(
+ [acquiredCollection = _acquiredCollection](OperationContext* opCtx) mutable {
+ // OnRollback, the acquired collection must be set to reference the previously
+ // established catalog snapshot
+ _updateAcquiredLocalCollection(opCtx, acquiredCollection);
+ });
+}
+
+ScopedLocalCatalogWriteFence::~ScopedLocalCatalogWriteFence() {
+ _updateAcquiredLocalCollection(_opCtx, _acquiredCollection);
+}
+
+void ScopedLocalCatalogWriteFence::_updateAcquiredLocalCollection(
+ OperationContext* opCtx, shard_role_details::AcquiredCollection* acquiredCollection) {
+ try {
+ auto collectionOrView =
+ acquireLocalCollectionOrView(opCtx, acquiredCollection->prerequisites);
+ invariant(std::holds_alternative<CollectionPtr>(collectionOrView));
+
+ acquiredCollection->collectionPtr = std::move(std::get<CollectionPtr>(collectionOrView));
+ } catch (...) {
+ fassertFailedWithStatus(737661, exceptionToStatus());
+ }
+}
+
YieldedTransactionResources::~YieldedTransactionResources() {
invariant(!_yieldedResources);
}
diff --git a/src/mongo/db/shard_role.h b/src/mongo/db/shard_role.h
index 8577d69e5c6..030147b26e9 100644
--- a/src/mongo/db/shard_role.h
+++ b/src/mongo/db/shard_role.h
@@ -180,13 +180,26 @@ public:
~ScopedCollectionAcquisition();
ScopedCollectionAcquisition(OperationContext* opCtx,
- const shard_role_details::AcquiredCollection& acquiredCollection)
+ shard_role_details::AcquiredCollection& acquiredCollection)
: _opCtx(opCtx), _acquiredCollection(acquiredCollection) {}
const NamespaceString& nss() const {
return _acquiredCollection.prerequisites.nss;
}
+ /**
+ * Returns whether the acquisition found a collection or the collection didn't exist.
+ */
+ bool exists() const {
+ return bool(_acquiredCollection.prerequisites.uuid);
+ }
+
+ /**
+ * Returns the UUID of the acquired collection, but this operation is only allowed if the
+ * collection `exists()`, otherwise this method will invariant.
+ */
+ const UUID& uuid() const;
+
// Access to services associated with the specified collection top to bottom on the hierarchical
// stack
@@ -202,12 +215,14 @@ public:
}
private:
+ friend class ScopedLocalCatalogWriteFence;
+
OperationContext* _opCtx;
// Points to the acquired resources that live on the TransactionResources opCtx decoration. The
// lifetime of these resources is tied to the lifetime of this
// ScopedCollectionOrViewAcquisition.
- const shard_role_details::AcquiredCollection& _acquiredCollection;
+ shard_role_details::AcquiredCollection& _acquiredCollection;
};
class ScopedViewAcquisition {
@@ -286,6 +301,38 @@ ScopedCollectionAcquisition acquireCollectionForLocalCatalogOnlyWithPotentialDat
OperationContext* opCtx, const NamespaceString& nss, LockMode mode);
/**
+ * This utility is what allows modifications to the local catalog part of an acquisition for a
+ * specific collection to become visible on a previously established acquisition for that
+ * collection, before or after the end of a WUOW.
+ *
+ * The presence of ScopedLocalCatalogWriteFence on the stack renders the collection for which it was
+ * instantiated unusable within its scope. Once it goes out of scope, any changes performed to the
+ * catalog collection will be visible to:
+ * - The transaction only, if the WUOW has not yet committed
+ * - Any subsequent collection acquisitions, when the WUOW commits
+ *
+ * NOTE: This utility by itself does not ensure that catalog modifications which are subordinate to
+ * the placement concern (create collection is subordinate to the location of the DB primary, for
+ * example) do not conflict with placement changes (e.g. movePrimary). This is currently implemented
+ * at a higher level through the usage of DB/Collection X-locks.
+ */
+class ScopedLocalCatalogWriteFence {
+public:
+ ScopedLocalCatalogWriteFence(OperationContext* opCtx, ScopedCollectionAcquisition* acquisition);
+ ~ScopedLocalCatalogWriteFence();
+
+ ScopedLocalCatalogWriteFence(ScopedLocalCatalogWriteFence&) = delete;
+ ScopedLocalCatalogWriteFence(ScopedLocalCatalogWriteFence&&) = delete;
+
+private:
+ static void _updateAcquiredLocalCollection(
+ OperationContext* opCtx, shard_role_details::AcquiredCollection* acquiredCollection);
+
+ OperationContext* _opCtx;
+ shard_role_details::AcquiredCollection* _acquiredCollection;
+};
+
+/**
* Serves as a temporary container for transaction resources which have been yielded via a call to
* `yieldTransactionResources`. Must never be destroyed without having been restored and the
* transaction resources properly committed/aborted.
diff --git a/src/mongo/db/shard_role_test.cpp b/src/mongo/db/shard_role_test.cpp
index 456a5b539c0..45bcf80574f 100644
--- a/src/mongo/db/shard_role_test.cpp
+++ b/src/mongo/db/shard_role_test.cpp
@@ -192,7 +192,6 @@ void ShardRoleTest::setUp() {
// Create nssShardedCollection1
createTestCollection(opCtx(), nssShardedCollection1);
const auto uuidShardedCollection1 = getCollectionUUID(_opCtx.get(), nssShardedCollection1);
- installDatabaseMetadata(opCtx(), dbNameTestDb, dbVersionTestDb);
installShardedCollectionMetadata(
opCtx(),
nssShardedCollection1,
@@ -1189,5 +1188,124 @@ TEST_F(ShardRoleTest, RestoreForWriteFailsIfCollectionIsNowAView) {
testRestoreFailsIfCollectionIsNowAView(AcquisitionPrerequisites::kWrite);
}
+// ---------------------------------------------------------------------------
+// ScopedLocalCatalogWriteFence
+
+TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceWUOWCommitWithinWriterScope) {
+ auto acquisition = acquireCollection(opCtx(),
+ {nssShardedCollection1,
+ PlacementConcern{{}, shardVersionShardedCollection1},
+ repl::ReadConcernArgs(),
+ AcquisitionPrerequisites::kRead},
+ MODE_X);
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+
+ {
+ WriteUnitOfWork wuow(opCtx());
+ CollectionWriter localCatalogWriter(opCtx(), &acquisition);
+ localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true);
+ wuow.commit();
+ }
+
+ ASSERT(acquisition.getCollectionPtr()->isTemporary());
+}
+
+TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceWUOWCommitAfterWriterScope) {
+ auto acquisition = acquireCollection(opCtx(),
+ {nssShardedCollection1,
+ PlacementConcern{{}, shardVersionShardedCollection1},
+ repl::ReadConcernArgs(),
+ AcquisitionPrerequisites::kRead},
+ MODE_X);
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+
+ WriteUnitOfWork wuow(opCtx());
+ {
+ CollectionWriter localCatalogWriter(opCtx(), &acquisition);
+ localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true);
+ }
+ ASSERT(acquisition.getCollectionPtr()->isTemporary());
+ wuow.commit();
+ ASSERT(acquisition.getCollectionPtr()->isTemporary());
+}
+
+TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceWUOWRollbackWithinWriterScope) {
+ auto acquisition = acquireCollection(opCtx(),
+ {nssShardedCollection1,
+ PlacementConcern{{}, shardVersionShardedCollection1},
+ repl::ReadConcernArgs(),
+ AcquisitionPrerequisites::kRead},
+ MODE_X);
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+
+ {
+ WriteUnitOfWork wuow(opCtx());
+ CollectionWriter localCatalogWriter(opCtx(), &acquisition);
+ localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true);
+ }
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+}
+
+TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceWUOWRollbackAfterWriterScope) {
+ auto acquisition = acquireCollection(opCtx(),
+ {nssShardedCollection1,
+ PlacementConcern{{}, shardVersionShardedCollection1},
+ repl::ReadConcernArgs(),
+ AcquisitionPrerequisites::kRead},
+ MODE_X);
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+
+ {
+ WriteUnitOfWork wuow(opCtx());
+ {
+ CollectionWriter localCatalogWriter(opCtx(), &acquisition);
+ localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true);
+ }
+ ASSERT(acquisition.getCollectionPtr()->isTemporary());
+ }
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+}
+
+TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceOutsideWUOUCommit) {
+ auto acquisition = acquireCollection(opCtx(),
+ {nssShardedCollection1,
+ PlacementConcern{{}, shardVersionShardedCollection1},
+ repl::ReadConcernArgs(),
+ AcquisitionPrerequisites::kRead},
+ MODE_X);
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+
+ {
+ CollectionWriter localCatalogWriter(opCtx(), &acquisition);
+ WriteUnitOfWork wuow(opCtx());
+ localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true);
+ ASSERT(localCatalogWriter->isTemporary());
+ wuow.commit();
+ ASSERT(localCatalogWriter->isTemporary());
+ }
+ ASSERT(acquisition.getCollectionPtr()->isTemporary());
+}
+
+TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceOutsideWUOURollback) {
+ auto acquisition = acquireCollection(opCtx(),
+ {nssShardedCollection1,
+ PlacementConcern{{}, shardVersionShardedCollection1},
+ repl::ReadConcernArgs(),
+ AcquisitionPrerequisites::kRead},
+ MODE_X);
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+
+ {
+ CollectionWriter localCatalogWriter(opCtx(), &acquisition);
+ {
+ WriteUnitOfWork wuow(opCtx());
+ localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true);
+ ASSERT(localCatalogWriter->isTemporary());
+ }
+ ASSERT(!localCatalogWriter->isTemporary());
+ }
+ ASSERT(!acquisition.getCollectionPtr()->isTemporary());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 6a351a50686..c22a18b5638 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -795,7 +795,7 @@ public:
/**
* Exposed for debugging purposes.
*/
- State getState() {
+ State getState() const {
return _getState();
}
diff --git a/src/mongo/db/transaction_resources.h b/src/mongo/db/transaction_resources.h
index d01ee5f332b..a0a03c8799c 100644
--- a/src/mongo/db/transaction_resources.h
+++ b/src/mongo/db/transaction_resources.h
@@ -158,7 +158,7 @@ struct TransactionResources {
~TransactionResources();
- const AcquiredCollection& addAcquiredCollection(AcquiredCollection&& acquiredCollection) {
+ AcquiredCollection& addAcquiredCollection(AcquiredCollection&& acquiredCollection) {
return acquiredCollections.emplace_back(std::move(acquiredCollection));
}