diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 39 |
4 files changed, 52 insertions, 49 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index 23fce736413..13dd6dae785 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -59,9 +59,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(ServiceContext::UniqueClient& const BSONObj& idIndexSpec) : _client{std::move(client)}, _opCtx{std::move(opCtx)}, - _autoColl{std::move(autoColl)}, - _collection{_autoColl->getWritableCollection()}, - _nss{_autoColl->getCollection()->ns()}, + _collection{std::move(autoColl)}, + _nss{_collection->getCollection()->ns()}, _idIndexBlock(std::make_unique<MultiIndexBlock>()), _secondaryIndexesBlock(std::make_unique<MultiIndexBlock>()), _idIndexSpec(idIndexSpec.getOwned()) { @@ -75,20 +74,20 @@ CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() { } Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndexSpecs) { - return _runTaskReleaseResourcesOnFailure([coll = _autoColl->getCollection(), - &secondaryIndexSpecs, - this]() -> Status { + return _runTaskReleaseResourcesOnFailure([&secondaryIndexSpecs, this]() -> Status { + WriteUnitOfWork wuow(_opCtx.get()); // All writes in CollectionBulkLoaderImpl should be unreplicated. // The opCtx is accessed indirectly through _secondaryIndexesBlock. UnreplicatedWritesBlock uwb(_opCtx.get()); // This enforces the buildIndexes setting in the replica set configuration. - auto indexCatalog = coll->getIndexCatalog(); + CollectionWriter collWriter(*_collection); + auto indexCatalog = collWriter.getWritableCollection()->getIndexCatalog(); auto specs = indexCatalog->removeExistingIndexesNoChecks(_opCtx.get(), secondaryIndexSpecs); if (specs.size()) { _secondaryIndexesBlock->ignoreUniqueConstraint(); auto status = _secondaryIndexesBlock - ->init(_opCtx.get(), _collection, specs, MultiIndexBlock::kNoopOnInitFn) + ->init(_opCtx.get(), collWriter, specs, MultiIndexBlock::kNoopOnInitFn) .getStatus(); if (!status.isOK()) { return status; @@ -99,7 +98,7 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex if (!_idIndexSpec.isEmpty()) { auto status = _idIndexBlock - ->init(_opCtx.get(), _collection, _idIndexSpec, MultiIndexBlock::kNoopOnInitFn) + ->init(_opCtx.get(), collWriter, _idIndexSpec, MultiIndexBlock::kNoopOnInitFn) .getStatus(); if (!status.isOK()) { return status; @@ -108,6 +107,7 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex _idIndexBlock.reset(); } + wuow.commit(); return Status::OK(); }); } @@ -134,8 +134,9 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( const auto& doc = *insertIter++; bytesInBlock += doc.objsize(); // This version of insert will not update any indexes. - const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader( - _opCtx.get(), doc, onRecordInserted); + const auto status = + (*_collection) + ->insertDocumentForBulkLoader(_opCtx.get(), doc, onRecordInserted); if (!status.isOK()) { return status; } @@ -181,8 +182,8 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection( WriteUnitOfWork wunit(_opCtx.get()); // For capped collections, we use regular insertDocument, which // will update pre-existing indexes. - const auto status = _autoColl->getCollection()->insertDocument( - _opCtx.get(), InsertStatement(doc), nullptr); + const auto status = + (*_collection)->insertDocument(_opCtx.get(), InsertStatement(doc), nullptr); if (!status.isOK()) { return status; } @@ -235,7 +236,7 @@ Status CollectionBulkLoaderImpl::commit() { WriteUnitOfWork wunit(_opCtx.get()); auto status = _secondaryIndexesBlock->commit(_opCtx.get(), - _collection, + _collection->getWritableCollection(), MultiIndexBlock::kNoopOnCreateEachFn, MultiIndexBlock::kNoopOnCommitFn); if (!status.isOK()) { @@ -262,12 +263,13 @@ Status CollectionBulkLoaderImpl::commit() { // before committing the index build, the index removal code uses // 'dupsAllowed', which forces the storage engine to only unindex // records that match the same key and RecordId. - _autoColl->getCollection()->deleteDocument(_opCtx.get(), - kUninitializedStmtId, - rid, - nullptr /** OpDebug **/, - false /* fromMigrate */, - true /* noWarn */); + (*_collection) + ->deleteDocument(_opCtx.get(), + kUninitializedStmtId, + rid, + nullptr /** OpDebug **/, + false /* fromMigrate */, + true /* noWarn */); wunit.commit(); return Status::OK(); }); @@ -296,7 +298,7 @@ Status CollectionBulkLoaderImpl::commit() { _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] { WriteUnitOfWork wunit(_opCtx.get()); auto status = _idIndexBlock->commit(_opCtx.get(), - _collection, + _collection->getWritableCollection(), MultiIndexBlock::kNoopOnCreateEachFn, MultiIndexBlock::kNoopOnCommitFn); if (!status.isOK()) { @@ -322,7 +324,7 @@ Status CollectionBulkLoaderImpl::commit() { // _releaseResources. _idIndexBlock.reset(); _secondaryIndexesBlock.reset(); - _autoColl.reset(); + _collection.reset(); return Status::OK(); }); } @@ -330,19 +332,20 @@ Status CollectionBulkLoaderImpl::commit() { void CollectionBulkLoaderImpl::_releaseResources() { invariant(&cc() == _opCtx->getClient()); if (_secondaryIndexesBlock) { + CollectionWriter collWriter(*_collection); _secondaryIndexesBlock->abortIndexBuild( - _opCtx.get(), _collection, MultiIndexBlock::kNoopOnCleanUpFn); + _opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn); _secondaryIndexesBlock.reset(); } if (_idIndexBlock) { - _idIndexBlock->abortIndexBuild( - _opCtx.get(), _collection, MultiIndexBlock::kNoopOnCleanUpFn); + CollectionWriter collWriter(*_collection); + _idIndexBlock->abortIndexBuild(_opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn); _idIndexBlock.reset(); } // release locks. - _autoColl.reset(); + _collection.reset(); } template <typename F> diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h index afb6df03bc2..fab8ed9c922 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.h +++ b/src/mongo/db/repl/collection_bulk_loader_impl.h @@ -104,8 +104,7 @@ private: ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; - std::unique_ptr<AutoGetCollection> _autoColl; - Collection* _collection; + std::unique_ptr<AutoGetCollection> _collection; NamespaceString _nss; std::unique_ptr<MultiIndexBlock> _idIndexBlock; std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock; diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index c0c242421f9..ecaf6fb104e 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -650,8 +650,8 @@ void ReplicationRecoveryImpl::_truncateOplogTo(OperationContext* opCtx, const NamespaceString oplogNss(NamespaceString::kRsOplogNamespace); AutoGetDb autoDb(opCtx, oplogNss.db(), MODE_IX); Lock::CollectionLock oplogCollectionLoc(opCtx, oplogNss, MODE_X); - Collection* oplogCollection = - CollectionCatalog::get(opCtx).lookupCollectionByNamespaceForMetadataWrite(opCtx, oplogNss); + auto oplogCollection = + CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, oplogNss); if (!oplogCollection) { fassertFailedWithStatusNoTrace( 34418, diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 6c3480157e9..fa6b07c6696 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -870,7 +870,7 @@ void dropIndex(OperationContext* opCtx, "namespace"_attr = nss.toString()); return; } - WriteUnitOfWork wunit(opCtx); + auto entry = indexCatalog->getEntry(indexDescriptor); if (entry->isReady(opCtx)) { auto status = indexCatalog->dropIndex(opCtx, indexDescriptor); @@ -894,7 +894,6 @@ void dropIndex(OperationContext* opCtx, "error"_attr = redact(status)); } } - wunit.commit(); } /** @@ -907,8 +906,7 @@ void rollbackCreateIndexes(OperationContext* opCtx, UUID uuid, std::set<std::str CollectionCatalog::get(opCtx).lookupNSSByUUID(opCtx, uuid); invariant(nss); Lock::DBLock dbLock(opCtx, nss->db(), MODE_X); - Collection* collection = - CollectionCatalog::get(opCtx).lookupCollectionByUUIDForMetadataWrite(opCtx, uuid); + CollectionWriter collection(opCtx, uuid); // If we cannot find the collection, we skip over dropping the index. if (!collection) { @@ -946,7 +944,9 @@ void rollbackCreateIndexes(OperationContext* opCtx, UUID uuid, std::set<std::str "uuid"_attr = uuid, "indexName"_attr = indexName); - dropIndex(opCtx, indexCatalog, indexName, *nss); + WriteUnitOfWork wuow(opCtx); + dropIndex(opCtx, collection.getWritableCollection()->getIndexCatalog(), indexName, *nss); + wuow.commit(); LOGV2_DEBUG(21673, 1, @@ -1574,8 +1574,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, auto db = databaseHolder->openDb(opCtx, nss->db().toString()); invariant(db); - Collection* collection = - CollectionCatalog::get(opCtx).lookupCollectionByUUIDForMetadataWrite(opCtx, uuid); + CollectionWriter collection(opCtx, uuid); invariant(collection); auto infoResult = rollbackSource.getCollectionInfoByUUID(nss->db().toString(), uuid); @@ -1627,7 +1626,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, // Set any document validation options. We update the validator fields without // parsing/validation, since we fetched the options object directly from the sync // source, and we should set our validation options to match it exactly. - auto validatorStatus = collection->updateValidator( + auto validatorStatus = collection.getWritableCollection()->updateValidator( opCtx, options.validator, options.validationLevel, options.validationAction); if (!validatorStatus.isOK()) { throw RSFatalException(str::stream() @@ -1729,8 +1728,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, const NamespaceString docNss(doc.ns); Lock::DBLock docDbLock(opCtx, docNss.db(), MODE_X); OldClientContext ctx(opCtx, doc.ns.toString()); - Collection* collection = - catalog.lookupCollectionByUUIDForMetadataWrite(opCtx, uuid); + CollectionWriter collection(opCtx, uuid); // Adds the doc to our rollback file if the collection was not dropped while // rolling back createCollection operations. Does not log an error when @@ -1740,7 +1738,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, if (collection && removeSaver) { BSONObj obj; - bool found = Helpers::findOne(opCtx, collection, pattern, obj, false); + bool found = Helpers::findOne(opCtx, collection.get(), pattern, obj, false); if (found) { auto status = removeSaver->goingToDelete(obj); if (!status.isOK()) { @@ -1791,7 +1789,8 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, const auto clock = opCtx->getServiceContext()->getFastClockSource(); const auto findOneStart = clock->now(); - RecordId loc = Helpers::findOne(opCtx, collection, pattern, false); + RecordId loc = + Helpers::findOne(opCtx, collection.get(), pattern, false); if (clock->now() - findOneStart > Milliseconds(200)) LOGV2_WARNING( 21726, @@ -1807,8 +1806,9 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, collection->ns().ns(), [&] { WriteUnitOfWork wunit(opCtx); - collection->cappedTruncateAfter( - opCtx, loc, true); + collection.getWritableCollection() + ->cappedTruncateAfter( + opCtx, loc, true); wunit.commit(); }); } catch (const DBException& e) { @@ -1817,7 +1817,9 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, writeConflictRetry( opCtx, "truncate", collection->ns().ns(), [&] { WriteUnitOfWork wunit(opCtx); - uassertStatusOK(collection->truncate(opCtx)); + uassertStatusOK( + collection.getWritableCollection() + ->truncate(opCtx)); wunit.commit(); }); } else { @@ -1843,7 +1845,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, } } else { deleteObjects(opCtx, - collection, + collection.get(), *nss, pattern, true, // justOne @@ -1947,9 +1949,8 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, Lock::DBLock oplogDbLock(opCtx, oplogNss.db(), MODE_IX); Lock::CollectionLock oplogCollectionLoc(opCtx, oplogNss, MODE_X); OldClientContext ctx(opCtx, oplogNss.ns()); - Collection* oplogCollection = - CollectionCatalog::get(opCtx).lookupCollectionByNamespaceForMetadataWrite(opCtx, - oplogNss); + auto oplogCollection = + CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, oplogNss); if (!oplogCollection) { fassertFailedWithStatusNoTrace( 40495, |