diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2023-03-22 15:03:54 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-22 16:26:50 +0000 |
commit | 7a62847796d3797278a7b5fb6215948cd325b45d (patch) | |
tree | 45c745ec13218ab7229b636d5c60ad24739884b9 | |
parent | a87c105f1c0babf189f7df6ba38b16b3f04b3162 (diff) | |
download | mongo-7a62847796d3797278a7b5fb6215948cd325b45d.tar.gz |
SERVER-73766 Use ScopedCollectionAcquisition in the CollectionBulkLoaderImpl
-rw-r--r-- | src/mongo/db/catalog_raii.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/catalog_raii.h | 17 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 68 | ||||
-rw-r--r-- | src/mongo/db/commands/drop_indexes_cmd.cpp (renamed from src/mongo/db/commands/drop_indexes.cpp) | 0 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/db/shard_role.cpp | 70 | ||||
-rw-r--r-- | src/mongo/db/shard_role.h | 51 | ||||
-rw-r--r-- | src/mongo/db/shard_role_test.cpp | 120 | ||||
-rw-r--r-- | src/mongo/db/storage/recovery_unit.h | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_resources.h | 2 |
11 files changed, 346 insertions, 84 deletions
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp index d82cc381480..b05d5afd422 100644 --- a/src/mongo/db/catalog_raii.cpp +++ b/src/mongo/db/catalog_raii.cpp @@ -40,6 +40,7 @@ #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/shard_role.h" #include "mongo/db/storage/storage_parameters_gen.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" @@ -409,7 +410,6 @@ Collection* AutoGetCollection::getWritableCollection(OperationContext* opCtx) { // Acquire writable instance if not already available if (!_writableColl) { - auto catalog = CollectionCatalog::get(opCtx); _writableColl = catalog->lookupCollectionByNamespaceForMetadataWrite(opCtx, _resolvedNss); // Makes the internal CollectionPtr Yieldable and resets the writable Collection when @@ -541,6 +541,26 @@ struct CollectionWriter::SharedImpl { std::function<Collection*()> _writableCollectionInitializer; }; +CollectionWriter::CollectionWriter(OperationContext* opCtx, + ScopedCollectionAcquisition* acquisition) + : _acquisition(acquisition), + _collection(&_storedCollection), + _managed(true), + _sharedImpl(std::make_shared<SharedImpl>(this)) { + + _storedCollection = CollectionPtr( + CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, _acquisition->nss())); + _storedCollection.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, _storedCollection)); + + _sharedImpl->_writableCollectionInitializer = [this, opCtx]() mutable { + invariant(!_fence); + _fence = std::make_unique<ScopedLocalCatalogWriteFence>(opCtx, _acquisition); + + return CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForMetadataWrite( + opCtx, _acquisition->nss()); + }; +} + CollectionWriter::CollectionWriter(OperationContext* opCtx, const UUID& uuid) : _collection(&_storedCollection), _managed(true), @@ -549,6 +569,7 @@ CollectionWriter::CollectionWriter(OperationContext* opCtx, const UUID& uuid) _storedCollection = CollectionPtr(CollectionCatalog::get(opCtx)->lookupCollectionByUUID(opCtx, uuid)); _storedCollection.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, _storedCollection)); + _sharedImpl->_writableCollectionInitializer = [opCtx, uuid]() { return CollectionCatalog::get(opCtx)->lookupCollectionByUUIDForMetadataWrite(opCtx, uuid); }; @@ -558,9 +579,11 @@ CollectionWriter::CollectionWriter(OperationContext* opCtx, const NamespaceStrin : _collection(&_storedCollection), _managed(true), _sharedImpl(std::make_shared<SharedImpl>(this)) { + _storedCollection = CollectionPtr(CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss)); _storedCollection.makeYieldable(opCtx, LockedCollectionYieldRestore(opCtx, _storedCollection)); + _sharedImpl->_writableCollectionInitializer = [opCtx, nss]() { return CollectionCatalog::get(opCtx)->lookupCollectionByNamespaceForMetadataWrite(opCtx, nss); @@ -571,6 +594,7 @@ CollectionWriter::CollectionWriter(OperationContext* opCtx, AutoGetCollection& a : _collection(&autoCollection.getCollection()), _managed(true), _sharedImpl(std::make_shared<SharedImpl>(this)) { + _sharedImpl->_writableCollectionInitializer = [&autoCollection, opCtx]() { return autoCollection.getWritableCollection(opCtx); }; @@ -594,8 +618,8 @@ Collection* CollectionWriter::getWritableCollection(OperationContext* opCtx) { if (!_writableCollection) { _writableCollection = _sharedImpl->_writableCollectionInitializer(); - // If we are using our stored Collection then we are not managed by an AutoGetCollection and - // we need to manage lifetime here. + // If we are using our stored Collection then we are not managed by an AutoGetCollection + // and we need to manage lifetime here. if (_managed) { bool usingStoredCollection = *_collection == _storedCollection; auto rollbackCollection = @@ -609,6 +633,7 @@ Collection* CollectionWriter::getWritableCollection(OperationContext* opCtx) { [shared = _sharedImpl](OperationContext* opCtx, boost::optional<Timestamp>) { if (shared->_parent) { shared->_parent->_writableCollection = nullptr; + shared->_parent->_fence.reset(); // Make the stored collection yieldable again as we now operate with the // same instance as is in the catalog. @@ -619,12 +644,15 @@ Collection* CollectionWriter::getWritableCollection(OperationContext* opCtx) { [shared = _sharedImpl, rollbackCollection = std::move(rollbackCollection)]( OperationContext* opCtx) mutable { if (shared->_parent) { - // Restore stored collection to its previous state. The rollback instance is - // already yieldable. - shared->_parent->_storedCollection = std::move(rollbackCollection); shared->_parent->_writableCollection = nullptr; + shared->_parent->_fence.reset(); + + // Restore stored collection to its previous state. The rollback + // instance is already yieldable. + shared->_parent->_storedCollection = std::move(rollbackCollection); } }); + if (usingStoredCollection) { _storedCollection = CollectionPtr(_writableCollection); } @@ -641,7 +669,8 @@ ReadSourceScope::ReadSourceScope(OperationContext* opCtx, RecoveryUnit::ReadSource readSource, boost::optional<Timestamp> provided) : _opCtx(opCtx), _originalReadSource(opCtx->recoveryUnit()->getTimestampReadSource()) { - // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read helper. + // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read + // helper. invariant(!_opCtx->isLockFreeReadsOp()); if (_originalReadSource == RecoveryUnit::ReadSource::kProvided) { @@ -653,7 +682,8 @@ ReadSourceScope::ReadSourceScope(OperationContext* opCtx, } ReadSourceScope::~ReadSourceScope() { - // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read helper. + // Abandoning the snapshot is unsafe when the snapshot is managed by a lock free read + // helper. invariant(!_opCtx->isLockFreeReadsOp()); _opCtx->recoveryUnit()->abandonSnapshot(); diff --git a/src/mongo/db/catalog_raii.h b/src/mongo/db/catalog_raii.h index b577ca3c5bc..7028c1708eb 100644 --- a/src/mongo/db/catalog_raii.h +++ b/src/mongo/db/catalog_raii.h @@ -432,8 +432,20 @@ private: * It is safe to re-use an instance for multiple WriteUnitOfWorks or to destroy it before the active * WriteUnitOfWork finishes. */ +class ScopedCollectionAcquisition; +class ScopedLocalCatalogWriteFence; + class CollectionWriter final { public: + // This constructor indicates to the shard role subsystem that the subsequent code enteres into + // local DDL land and that the content of the local collection should not be trusted until it + // goes out of scope. + // + // See the comments on ScopedCollectionAcquisition for more details. + // + // TODO (SERVER-73766): Only this constructor should remain in use + CollectionWriter(OperationContext* opCtx, ScopedCollectionAcquisition* acquisition); + // Gets the collection from the catalog for the provided uuid CollectionWriter(OperationContext* opCtx, const UUID& uuid); // Gets the collection from the catalog for the provided namespace string @@ -474,6 +486,11 @@ public: Collection* getWritableCollection(OperationContext* opCtx); private: + // This group of values is only operated on for code paths that go through the + // `ScopedCollectionAcquisition` constructor. + ScopedCollectionAcquisition* _acquisition; + std::unique_ptr<ScopedLocalCatalogWriteFence> _fence; + // If this class is instantiated with the constructors that take UUID or nss we need somewhere // to store the CollectionPtr used. But if it is instantiated with an AutoGetCollection then the // lifetime of the object is managed there. To unify the two code paths we have a pointer that diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 10acc571c27..686b0e17923 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -338,42 +338,42 @@ env.Library( # Commands that are present in both mongod and embedded env.Library( - target="standalone", - source=[ - "analyze_cmd.cpp", - "count_cmd.cpp", - "create_command.cpp", - "create_indexes_cmd.cpp", - "current_op.cpp", - "dbcommands.cpp", - "distinct.cpp", - "drop_indexes.cpp", - "explain_cmd.cpp", - "find_and_modify.cpp", - "find_cmd.cpp", + target='standalone', + source=[ + 'analyze_cmd.cpp', + 'count_cmd.cpp', + 'create_command.cpp', + 'create_indexes_cmd.cpp', + 'current_op.cpp', + 'dbcommands.cpp', + 'distinct.cpp', + 'drop_indexes_cmd.cpp', + 'explain_cmd.cpp', + 'find_and_modify.cpp', + 'find_cmd.cpp', 'fle2_get_count_info_command.cpp', - "getmore_cmd.cpp", - "http_client.cpp", + 'getmore_cmd.cpp', + 'http_client.cpp', 'http_client.idl', - "index_filter_commands.cpp", - "kill_op.cpp", - "killcursors_cmd.cpp", - "killoperations_cmd.cpp", - "lock_info.cpp", - "list_collections.cpp", - "list_databases.cpp", + 'index_filter_commands.cpp', + 'kill_op.cpp', + 'killcursors_cmd.cpp', + 'killoperations_cmd.cpp', + 'list_collections.cpp', + 'list_databases.cpp', 'list_databases_for_all_tenants.cpp', - "list_indexes.cpp", - "pipeline_command.cpp", - "plan_cache_clear_command.cpp", - "plan_cache_commands.cpp", - "rename_collection_cmd.cpp", - "run_aggregate.cpp", - "sleep_command.cpp", - "validate.cpp", - "validate_db_metadata_cmd.cpp", - "whats_my_sni_command.cpp", - "write_commands.cpp", + 'list_indexes.cpp', + 'lock_info.cpp', + 'pipeline_command.cpp', + 'plan_cache_clear_command.cpp', + 'plan_cache_commands.cpp', + 'rename_collection_cmd.cpp', + 'run_aggregate.cpp', + 'sleep_command.cpp', + 'validate.cpp', + 'validate_db_metadata_cmd.cpp', + 'whats_my_sni_command.cpp', + 'write_commands.cpp', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', @@ -421,7 +421,7 @@ env.Library( '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/db/stats/server_read_concern_write_concern_metrics', '$BUILD_DIR/mongo/db/storage/storage_engine_common', - "$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl", + '$BUILD_DIR/mongo/db/storage/two_phase_index_build_knobs_idl', '$BUILD_DIR/mongo/db/timeseries/catalog_helper', '$BUILD_DIR/mongo/db/timeseries/timeseries_collmod', '$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util', diff --git a/src/mongo/db/commands/drop_indexes.cpp b/src/mongo/db/commands/drop_indexes_cmd.cpp index a57059e0fc8..a57059e0fc8 100644 --- a/src/mongo/db/commands/drop_indexes.cpp +++ b/src/mongo/db/commands/drop_indexes_cmd.cpp diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index b3a85d9c372..12ebf0176f7 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -57,7 +57,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(ServiceContext::UniqueClient const BSONObj& idIndexSpec) : _client{std::move(client)}, _opCtx{std::move(opCtx)}, - _collection{_opCtx.get(), nss, MODE_X}, + _acquisition( + acquireCollectionForLocalCatalogOnlyWithPotentialDataLoss(_opCtx.get(), nss, MODE_X)), _nss{nss}, _idIndexBlock(std::make_unique<MultiIndexBlock>()), _secondaryIndexesBlock(std::make_unique<MultiIndexBlock>()), @@ -82,14 +83,14 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex return writeConflictRetry( _opCtx.get(), "CollectionBulkLoader::init", - _collection.getNss().ns(), + _acquisition.nss().ns(), [&secondaryIndexSpecs, this] { WriteUnitOfWork wuow(_opCtx.get()); // All writes in CollectionBulkLoaderImpl should be unreplicated. // The opCtx is accessed indirectly through _secondaryIndexesBlock. UnreplicatedWritesBlock uwb(_opCtx.get()); // This enforces the buildIndexes setting in the replica set configuration. - CollectionWriter collWriter(_opCtx.get(), _collection); + CollectionWriter collWriter(_opCtx.get(), &_acquisition); auto indexCatalog = collWriter.getWritableCollection(_opCtx.get())->getIndexCatalog(); auto specs = indexCatalog->removeExistingIndexesNoChecks( @@ -152,7 +153,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( bytesInBlock += doc.objsize(); // This version of insert will not update any indexes. const auto status = collection_internal::insertDocumentForBulkLoader( - _opCtx.get(), *_collection, doc, onRecordInserted); + _opCtx.get(), _acquisition.getCollectionPtr(), doc, onRecordInserted); if (!status.isOK()) { return status; } @@ -199,7 +200,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection( // For capped collections, we use regular insertDocument, which // will update pre-existing indexes. const auto status = collection_internal::insertDocument( - _opCtx.get(), *_collection, InsertStatement(doc), nullptr); + _opCtx.get(), _acquisition.getCollectionPtr(), InsertStatement(doc), nullptr); if (!status.isOK()) { return status; } @@ -238,21 +239,24 @@ Status CollectionBulkLoaderImpl::commit() { // Commit before deleting dups, so the dups will be removed from secondary indexes when // deleted. if (_secondaryIndexesBlock) { - auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(_opCtx.get(), *_collection); + auto status = _secondaryIndexesBlock->dumpInsertsFromBulk( + _opCtx.get(), _acquisition.getCollectionPtr()); if (!status.isOK()) { return status; } // This should always return Status::OK() as the foreground index build doesn't install // an interceptor. - invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(), *_collection)); + invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(), + _acquisition.getCollectionPtr())); status = writeConflictRetry( _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] { WriteUnitOfWork wunit(_opCtx.get()); + CollectionWriter collWriter(_opCtx.get(), &_acquisition); auto status = _secondaryIndexesBlock->commit( _opCtx.get(), - _collection.getWritableCollection(_opCtx.get()), + collWriter.getWritableCollection(_opCtx.get()), MultiIndexBlock::kNoopOnCreateEachFn, MultiIndexBlock::kNoopOnCommitFn); if (!status.isOK()) { @@ -269,20 +273,24 @@ Status CollectionBulkLoaderImpl::commit() { if (_idIndexBlock) { // Do not do inside a WriteUnitOfWork (required by dumpInsertsFromBulk). auto status = _idIndexBlock->dumpInsertsFromBulk( - _opCtx.get(), *_collection, [&](const RecordId& rid) { + _opCtx.get(), _acquisition.getCollectionPtr(), [&](const RecordId& rid) { writeConflictRetry( _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &rid] { WriteUnitOfWork wunit(_opCtx.get()); - auto doc = _collection->docFor(_opCtx.get(), rid); + auto doc = _acquisition.getCollectionPtr()->docFor(_opCtx.get(), rid); // Delete the document before committing the index. If we were to delete // the document after committing the index, it's possible that the we // may unindex a record with the same key but a different RecordId. - _collection->getRecordStore()->deleteRecord(_opCtx.get(), rid); - - auto indexIt = _collection->getIndexCatalog()->getIndexIterator( - _opCtx.get(), IndexCatalog::InclusionPolicy::kReady); + _acquisition.getCollectionPtr()->getRecordStore()->deleteRecord( + _opCtx.get(), rid); + + auto indexIt = + _acquisition.getCollectionPtr() + ->getIndexCatalog() + ->getIndexIterator(_opCtx.get(), + IndexCatalog::InclusionPolicy::kReady); while (auto entry = indexIt->next()) { if (entry->descriptor()->isIdIndex()) { continue; @@ -297,7 +305,7 @@ Status CollectionBulkLoaderImpl::commit() { entry->accessMethod()->remove( _opCtx.get(), pooledBuilder, - *_collection, + _acquisition.getCollectionPtr(), doc.value(), rid, false /* logIfError */, @@ -322,9 +330,10 @@ Status CollectionBulkLoaderImpl::commit() { status = writeConflictRetry( _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] { WriteUnitOfWork wunit(_opCtx.get()); + CollectionWriter collWriter(_opCtx.get(), &_acquisition); auto status = _idIndexBlock->commit(_opCtx.get(), - _collection.getWritableCollection(_opCtx.get()), + collWriter.getWritableCollection(_opCtx.get()), MultiIndexBlock::kNoopOnCreateEachFn, MultiIndexBlock::kNoopOnCommitFn); if (!status.isOK()) { @@ -357,14 +366,14 @@ Status CollectionBulkLoaderImpl::commit() { void CollectionBulkLoaderImpl::_releaseResources() { invariant(&cc() == _opCtx->getClient()); if (_secondaryIndexesBlock) { - CollectionWriter collWriter(_opCtx.get(), _collection); + CollectionWriter collWriter(_opCtx.get(), &_acquisition); _secondaryIndexesBlock->abortIndexBuild( _opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn); _secondaryIndexesBlock.reset(); } if (_idIndexBlock) { - CollectionWriter collWriter(_opCtx.get(), _collection); + CollectionWriter collWriter(_opCtx.get(), &_acquisition); _idIndexBlock->abortIndexBuild(_opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn); _idIndexBlock.reset(); } @@ -392,7 +401,7 @@ Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc, if (_idIndexBlock) { auto status = _idIndexBlock->insertSingleDocumentForInitialSyncOrRecovery( _opCtx.get(), - *_collection, + _acquisition.getCollectionPtr(), doc, loc, // This caller / code path does not have cursors to save/restore. @@ -406,7 +415,7 @@ Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc, if (_secondaryIndexesBlock) { auto status = _secondaryIndexesBlock->insertSingleDocumentForInitialSyncOrRecovery( _opCtx.get(), - *_collection, + _acquisition.getCollectionPtr(), doc, loc, // This caller / code path does not have cursors to save/restore. diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h index 8e7b53e497a..f52b4983e69 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.h +++ b/src/mongo/db/repl/collection_bulk_loader_impl.h @@ -34,10 +34,9 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/catalog/multi_index_block.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_bulk_loader.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/db/shard_role.h" namespace mongo { namespace repl { @@ -104,7 +103,7 @@ private: ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; - AutoGetCollection _collection; + ScopedCollectionAcquisition _acquisition; NamespaceString _nss; std::unique_ptr<MultiIndexBlock> _idIndexBlock; std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock; diff --git a/src/mongo/db/shard_role.cpp b/src/mongo/db/shard_role.cpp index 5beb67be776..74f11f9e247 100644 --- a/src/mongo/db/shard_role.cpp +++ b/src/mongo/db/shard_role.cpp @@ -283,7 +283,7 @@ std::vector<ScopedCollectionOrViewAcquisition> acquireResolvedCollectionsOrViews prerequisites.uuid = collectionPtr->uuid(); } - const shard_role_details::AcquiredCollection& acquiredCollection = + shard_role_details::AcquiredCollection& acquiredCollection = getOrMakeTransactionResources(opCtx).addAcquiredCollection( {prerequisites, std::move(acquisitionRequest.second.dbLock), @@ -341,6 +341,13 @@ CollectionAcquisitionRequest CollectionAcquisitionRequest::fromOpCtx( nss, {oss.getDbVersion(nss.db()), oss.getShardVersion(nss)}, readConcern, operationType); } +const UUID& ScopedCollectionAcquisition::uuid() const { + invariant(exists(), + str::stream() << "Collection " << nss() + << " doesn't exist, so its UUID cannot be obtained"); + return *_acquiredCollection.prerequisites.uuid; +} + const ScopedCollectionDescription& ScopedCollectionAcquisition::getShardingDescription() const { // The collectionDescription will only not be set if the caller as acquired the acquisition // using the kLocalCatalogOnlyWithPotentialDataLoss placement concern @@ -483,6 +490,8 @@ std::vector<ScopedCollectionOrViewAcquisition> acquireCollectionsOrViewsWithoutT ScopedCollectionAcquisition acquireCollectionForLocalCatalogOnlyWithPotentialDataLoss( OperationContext* opCtx, const NamespaceString& nss, LockMode mode) { + invariant(!OperationShardingState::isComingFromRouter(opCtx)); + auto& txnResources = getOrMakeTransactionResources(opCtx); txnResources.assertNoAcquiredCollections(); @@ -501,23 +510,56 @@ ScopedCollectionAcquisition acquireCollectionForLocalCatalogOnlyWithPotentialDat auto& coll = std::get<CollectionPtr>(collOrView); - const shard_role_details::AcquiredCollection& acquiredCollection = - txnResources.addAcquiredCollection( - {AcquisitionPrerequisites( - nss, - coll ? boost::optional<UUID>(coll->uuid()) : boost::none, - AcquisitionPrerequisites::kLocalCatalogOnlyWithPotentialDataLoss, - AcquisitionPrerequisites::OperationType::kWrite, - AcquisitionPrerequisites::ViewMode::kMustBeCollection), - std::move(dbLock), - std::move(collLock), - boost::none, - boost::none, - std::move(coll)}); + shard_role_details::AcquiredCollection& acquiredCollection = txnResources.addAcquiredCollection( + {AcquisitionPrerequisites(nss, + coll ? boost::optional<UUID>(coll->uuid()) : boost::none, + AcquisitionPrerequisites::kLocalCatalogOnlyWithPotentialDataLoss, + AcquisitionPrerequisites::OperationType::kWrite, + AcquisitionPrerequisites::ViewMode::kMustBeCollection), + std::move(dbLock), + std::move(collLock), + boost::none, + boost::none, + std::move(coll)}); return ScopedCollectionAcquisition(opCtx, acquiredCollection); } +ScopedLocalCatalogWriteFence::ScopedLocalCatalogWriteFence(OperationContext* opCtx, + ScopedCollectionAcquisition* acquisition) + : _opCtx(opCtx), _acquiredCollection(&acquisition->_acquiredCollection) { + // Clear the collectionPtr from the acquisition to indicate that it should not be used until the + // caller is done with the DDL modifications + _acquiredCollection->collectionPtr = CollectionPtr(); + + // OnCommit, there is nothing to do because the caller is not allowed to use the collection in + // the scope of the ScopedLocalCatalogWriteFence and the destructor will take care of updating + // the acquisition to point to the latest changed value. + opCtx->recoveryUnit()->onRollback( + [acquiredCollection = _acquiredCollection](OperationContext* opCtx) mutable { + // OnRollback, the acquired collection must be set to reference the previously + // established catalog snapshot + _updateAcquiredLocalCollection(opCtx, acquiredCollection); + }); +} + +ScopedLocalCatalogWriteFence::~ScopedLocalCatalogWriteFence() { + _updateAcquiredLocalCollection(_opCtx, _acquiredCollection); +} + +void ScopedLocalCatalogWriteFence::_updateAcquiredLocalCollection( + OperationContext* opCtx, shard_role_details::AcquiredCollection* acquiredCollection) { + try { + auto collectionOrView = + acquireLocalCollectionOrView(opCtx, acquiredCollection->prerequisites); + invariant(std::holds_alternative<CollectionPtr>(collectionOrView)); + + acquiredCollection->collectionPtr = std::move(std::get<CollectionPtr>(collectionOrView)); + } catch (...) { + fassertFailedWithStatus(737661, exceptionToStatus()); + } +} + YieldedTransactionResources::~YieldedTransactionResources() { invariant(!_yieldedResources); } diff --git a/src/mongo/db/shard_role.h b/src/mongo/db/shard_role.h index 8577d69e5c6..030147b26e9 100644 --- a/src/mongo/db/shard_role.h +++ b/src/mongo/db/shard_role.h @@ -180,13 +180,26 @@ public: ~ScopedCollectionAcquisition(); ScopedCollectionAcquisition(OperationContext* opCtx, - const shard_role_details::AcquiredCollection& acquiredCollection) + shard_role_details::AcquiredCollection& acquiredCollection) : _opCtx(opCtx), _acquiredCollection(acquiredCollection) {} const NamespaceString& nss() const { return _acquiredCollection.prerequisites.nss; } + /** + * Returns whether the acquisition found a collection or the collection didn't exist. + */ + bool exists() const { + return bool(_acquiredCollection.prerequisites.uuid); + } + + /** + * Returns the UUID of the acquired collection, but this operation is only allowed if the + * collection `exists()`, otherwise this method will invariant. + */ + const UUID& uuid() const; + // Access to services associated with the specified collection top to bottom on the hierarchical // stack @@ -202,12 +215,14 @@ public: } private: + friend class ScopedLocalCatalogWriteFence; + OperationContext* _opCtx; // Points to the acquired resources that live on the TransactionResources opCtx decoration. The // lifetime of these resources is tied to the lifetime of this // ScopedCollectionOrViewAcquisition. - const shard_role_details::AcquiredCollection& _acquiredCollection; + shard_role_details::AcquiredCollection& _acquiredCollection; }; class ScopedViewAcquisition { @@ -286,6 +301,38 @@ ScopedCollectionAcquisition acquireCollectionForLocalCatalogOnlyWithPotentialDat OperationContext* opCtx, const NamespaceString& nss, LockMode mode); /** + * This utility is what allows modifications to the local catalog part of an acquisition for a + * specific collection to become visible on a previously established acquisition for that + * collection, before or after the end of a WUOW. + * + * The presence of ScopedLocalCatalogWriteFence on the stack renders the collection for which it was + * instantiated unusable within its scope. Once it goes out of scope, any changes performed to the + * catalog collection will be visible to: + * - The transaction only, if the WUOW has not yet committed + * - Any subsequent collection acquisitions, when the WUOW commits + * + * NOTE: This utility by itself does not ensure that catalog modifications which are subordinate to + * the placement concern (create collection is subordinate to the location of the DB primary, for + * example) do not conflict with placement changes (e.g. movePrimary). This is currently implemented + * at a higher level through the usage of DB/Collection X-locks. + */ +class ScopedLocalCatalogWriteFence { +public: + ScopedLocalCatalogWriteFence(OperationContext* opCtx, ScopedCollectionAcquisition* acquisition); + ~ScopedLocalCatalogWriteFence(); + + ScopedLocalCatalogWriteFence(ScopedLocalCatalogWriteFence&) = delete; + ScopedLocalCatalogWriteFence(ScopedLocalCatalogWriteFence&&) = delete; + +private: + static void _updateAcquiredLocalCollection( + OperationContext* opCtx, shard_role_details::AcquiredCollection* acquiredCollection); + + OperationContext* _opCtx; + shard_role_details::AcquiredCollection* _acquiredCollection; +}; + +/** * Serves as a temporary container for transaction resources which have been yielded via a call to * `yieldTransactionResources`. Must never be destroyed without having been restored and the * transaction resources properly committed/aborted. diff --git a/src/mongo/db/shard_role_test.cpp b/src/mongo/db/shard_role_test.cpp index 456a5b539c0..45bcf80574f 100644 --- a/src/mongo/db/shard_role_test.cpp +++ b/src/mongo/db/shard_role_test.cpp @@ -192,7 +192,6 @@ void ShardRoleTest::setUp() { // Create nssShardedCollection1 createTestCollection(opCtx(), nssShardedCollection1); const auto uuidShardedCollection1 = getCollectionUUID(_opCtx.get(), nssShardedCollection1); - installDatabaseMetadata(opCtx(), dbNameTestDb, dbVersionTestDb); installShardedCollectionMetadata( opCtx(), nssShardedCollection1, @@ -1189,5 +1188,124 @@ TEST_F(ShardRoleTest, RestoreForWriteFailsIfCollectionIsNowAView) { testRestoreFailsIfCollectionIsNowAView(AcquisitionPrerequisites::kWrite); } +// --------------------------------------------------------------------------- +// ScopedLocalCatalogWriteFence + +TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceWUOWCommitWithinWriterScope) { + auto acquisition = acquireCollection(opCtx(), + {nssShardedCollection1, + PlacementConcern{{}, shardVersionShardedCollection1}, + repl::ReadConcernArgs(), + AcquisitionPrerequisites::kRead}, + MODE_X); + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); + + { + WriteUnitOfWork wuow(opCtx()); + CollectionWriter localCatalogWriter(opCtx(), &acquisition); + localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true); + wuow.commit(); + } + + ASSERT(acquisition.getCollectionPtr()->isTemporary()); +} + +TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceWUOWCommitAfterWriterScope) { + auto acquisition = acquireCollection(opCtx(), + {nssShardedCollection1, + PlacementConcern{{}, shardVersionShardedCollection1}, + repl::ReadConcernArgs(), + AcquisitionPrerequisites::kRead}, + MODE_X); + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); + + WriteUnitOfWork wuow(opCtx()); + { + CollectionWriter localCatalogWriter(opCtx(), &acquisition); + localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true); + } + ASSERT(acquisition.getCollectionPtr()->isTemporary()); + wuow.commit(); + ASSERT(acquisition.getCollectionPtr()->isTemporary()); +} + +TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceWUOWRollbackWithinWriterScope) { + auto acquisition = acquireCollection(opCtx(), + {nssShardedCollection1, + PlacementConcern{{}, shardVersionShardedCollection1}, + repl::ReadConcernArgs(), + AcquisitionPrerequisites::kRead}, + MODE_X); + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); + + { + WriteUnitOfWork wuow(opCtx()); + CollectionWriter localCatalogWriter(opCtx(), &acquisition); + localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true); + } + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); +} + +TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceWUOWRollbackAfterWriterScope) { + auto acquisition = acquireCollection(opCtx(), + {nssShardedCollection1, + PlacementConcern{{}, shardVersionShardedCollection1}, + repl::ReadConcernArgs(), + AcquisitionPrerequisites::kRead}, + MODE_X); + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); + + { + WriteUnitOfWork wuow(opCtx()); + { + CollectionWriter localCatalogWriter(opCtx(), &acquisition); + localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true); + } + ASSERT(acquisition.getCollectionPtr()->isTemporary()); + } + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); +} + +TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceOutsideWUOUCommit) { + auto acquisition = acquireCollection(opCtx(), + {nssShardedCollection1, + PlacementConcern{{}, shardVersionShardedCollection1}, + repl::ReadConcernArgs(), + AcquisitionPrerequisites::kRead}, + MODE_X); + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); + + { + CollectionWriter localCatalogWriter(opCtx(), &acquisition); + WriteUnitOfWork wuow(opCtx()); + localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true); + ASSERT(localCatalogWriter->isTemporary()); + wuow.commit(); + ASSERT(localCatalogWriter->isTemporary()); + } + ASSERT(acquisition.getCollectionPtr()->isTemporary()); +} + +TEST_F(ShardRoleTest, ScopedLocalCatalogWriteFenceOutsideWUOURollback) { + auto acquisition = acquireCollection(opCtx(), + {nssShardedCollection1, + PlacementConcern{{}, shardVersionShardedCollection1}, + repl::ReadConcernArgs(), + AcquisitionPrerequisites::kRead}, + MODE_X); + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); + + { + CollectionWriter localCatalogWriter(opCtx(), &acquisition); + { + WriteUnitOfWork wuow(opCtx()); + localCatalogWriter.getWritableCollection(opCtx())->setIsTemp(opCtx(), true); + ASSERT(localCatalogWriter->isTemporary()); + } + ASSERT(!localCatalogWriter->isTemporary()); + } + ASSERT(!acquisition.getCollectionPtr()->isTemporary()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 6a351a50686..c22a18b5638 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -795,7 +795,7 @@ public: /** * Exposed for debugging purposes. */ - State getState() { + State getState() const { return _getState(); } diff --git a/src/mongo/db/transaction_resources.h b/src/mongo/db/transaction_resources.h index d01ee5f332b..a0a03c8799c 100644 --- a/src/mongo/db/transaction_resources.h +++ b/src/mongo/db/transaction_resources.h @@ -158,7 +158,7 @@ struct TransactionResources { ~TransactionResources(); - const AcquiredCollection& addAcquiredCollection(AcquiredCollection&& acquiredCollection) { + AcquiredCollection& addAcquiredCollection(AcquiredCollection&& acquiredCollection) { return acquiredCollections.emplace_back(std::move(acquiredCollection)); } |