diff options
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.cpp | 173 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 31 |
3 files changed, 101 insertions, 116 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index d405146eb37..7e311a0b7a4 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -53,17 +53,16 @@ namespace repl { CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(ServiceContext::UniqueClient&& client, ServiceContext::UniqueOperationContext&& opCtx, - std::unique_ptr<AutoGetCollection>&& autoColl, + const NamespaceString& nss, const BSONObj& idIndexSpec) : _client{std::move(client)}, _opCtx{std::move(opCtx)}, - _collection{std::move(autoColl)}, - _nss{_collection->getCollection()->ns()}, + _nss{nss}, _idIndexBlock(std::make_unique<MultiIndexBlock>()), _secondaryIndexesBlock(std::make_unique<MultiIndexBlock>()), _idIndexSpec(idIndexSpec.getOwned()) { invariant(_opCtx); - invariant(_collection); + invariant(!_nss.isEmpty()); } CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() { @@ -72,7 +71,8 @@ CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() { } Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndexSpecs) { - return _runTaskReleaseResourcesOnFailure([&secondaryIndexSpecs, this]() -> Status { + return _runTaskReleaseResourcesOnFailure([&]() -> Status { + AutoGetCollection coll(_opCtx.get(), _nss, MODE_X); // This method is called during initial sync of a replica set member, so we can safely tell // the index builders to build in the foreground instead of using the hybrid approach. The // member won't be available to be queried by anyone until it's caught up with the primary. @@ -80,57 +80,52 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex // locks as yielding a MODE_X/MODE_S lock isn't allowed. _secondaryIndexesBlock->setIndexBuildMethod(IndexBuildMethod::kForeground); _idIndexBlock->setIndexBuildMethod(IndexBuildMethod::kForeground); - return writeConflictRetry( - _opCtx.get(), - "CollectionBulkLoader::init", - _collection->getNss().ns(), - [&secondaryIndexSpecs, this] { - WriteUnitOfWork wuow(_opCtx.get()); - // All writes in CollectionBulkLoaderImpl should be unreplicated. - // The opCtx is accessed indirectly through _secondaryIndexesBlock. - UnreplicatedWritesBlock uwb(_opCtx.get()); - // This enforces the buildIndexes setting in the replica set configuration. - CollectionWriter collWriter(_opCtx.get(), *_collection); - auto indexCatalog = - collWriter.getWritableCollection(_opCtx.get())->getIndexCatalog(); - auto specs = indexCatalog->removeExistingIndexesNoChecks( - _opCtx.get(), collWriter.get(), secondaryIndexSpecs); - if (specs.size()) { - _secondaryIndexesBlock->ignoreUniqueConstraint(); - auto status = _secondaryIndexesBlock - ->init(_opCtx.get(), - collWriter, - specs, - MultiIndexBlock::kNoopOnInitFn, - /*forRecovery=*/false) - .getStatus(); - if (!status.isOK()) { - return status; - } - } else { - _secondaryIndexesBlock.reset(); + return writeConflictRetry(_opCtx.get(), "CollectionBulkLoader::init", _nss.ns(), [&] { + WriteUnitOfWork wuow(_opCtx.get()); + // All writes in CollectionBulkLoaderImpl should be unreplicated. + // The opCtx is accessed indirectly through _secondaryIndexesBlock. + UnreplicatedWritesBlock uwb(_opCtx.get()); + // This enforces the buildIndexes setting in the replica set configuration. + CollectionWriter collWriter(_opCtx.get(), coll); + auto indexCatalog = collWriter.getWritableCollection(_opCtx.get())->getIndexCatalog(); + auto specs = indexCatalog->removeExistingIndexesNoChecks( + _opCtx.get(), collWriter.get(), secondaryIndexSpecs); + if (specs.size()) { + _secondaryIndexesBlock->ignoreUniqueConstraint(); + auto status = _secondaryIndexesBlock + ->init(_opCtx.get(), + collWriter, + specs, + MultiIndexBlock::kNoopOnInitFn, + /*forRecovery=*/false) + .getStatus(); + if (!status.isOK()) { + return status; } - if (!_idIndexSpec.isEmpty()) { - auto status = _idIndexBlock - ->init(_opCtx.get(), - collWriter, - _idIndexSpec, - MultiIndexBlock::kNoopOnInitFn) - .getStatus(); - if (!status.isOK()) { - return status; - } - } else { - _idIndexBlock.reset(); + } else { + _secondaryIndexesBlock.reset(); + } + if (!_idIndexSpec.isEmpty()) { + auto status = + _idIndexBlock + ->init( + _opCtx.get(), collWriter, _idIndexSpec, MultiIndexBlock::kNoopOnInitFn) + .getStatus(); + if (!status.isOK()) { + return status; } + } else { + _idIndexBlock.reset(); + } - wuow.commit(); - return Status::OK(); - }); + wuow.commit(); + return Status::OK(); + }); }); } Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( + const CollectionPtr& coll, const std::vector<BSONObj>::const_iterator begin, const std::vector<BSONObj>::const_iterator end) { auto iter = begin; @@ -153,7 +148,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( bytesInBlock += doc.objsize(); // This version of insert will not update any indexes. const auto status = collection_internal::insertDocumentForBulkLoader( - _opCtx.get(), **_collection, doc, onRecordInserted); + _opCtx.get(), coll, doc, onRecordInserted); if (!status.isOK()) { return status; } @@ -173,7 +168,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( status = writeConflictRetry(_opCtx.get(), "_addDocumentToIndexBlocks", _nss.ns(), [&] { WriteUnitOfWork wunit(_opCtx.get()); for (size_t index = 0; index < locs.size(); ++index) { - status = _addDocumentToIndexBlocks(*iter++, locs.at(index)); + status = _addDocumentToIndexBlocks(coll, *iter++, locs.at(index)); if (!status.isOK()) { return status; } @@ -190,6 +185,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( } Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection( + const CollectionPtr& coll, const std::vector<BSONObj>::const_iterator begin, const std::vector<BSONObj>::const_iterator end) { for (auto iter = begin; iter != end; ++iter) { @@ -200,7 +196,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection( // For capped collections, we use regular insertDocument, which // will update pre-existing indexes. const auto status = collection_internal::insertDocument( - _opCtx.get(), **_collection, InsertStatement(doc), nullptr); + _opCtx.get(), coll, InsertStatement(doc), nullptr); if (!status.isOK()) { return status; } @@ -217,17 +213,20 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection( Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin, const std::vector<BSONObj>::const_iterator end) { return _runTaskReleaseResourcesOnFailure([&] { + AutoGetCollection coll( + _opCtx.get(), _nss, fixLockModeForSystemDotViewsChanges(_nss, MODE_IX)); UnreplicatedWritesBlock uwb(_opCtx.get()); if (_idIndexBlock || _secondaryIndexesBlock) { - return _insertDocumentsForUncappedCollection(begin, end); + return _insertDocumentsForUncappedCollection(*coll, begin, end); } else { - return _insertDocumentsForCappedCollection(begin, end); + return _insertDocumentsForCappedCollection(*coll, begin, end); } }); } Status CollectionBulkLoaderImpl::commit() { return _runTaskReleaseResourcesOnFailure([&] { + AutoGetCollection coll(_opCtx.get(), _nss, MODE_X); _stats.startBuildingIndexes = Date_t::now(); LOGV2_DEBUG(21130, 2, @@ -239,25 +238,23 @@ Status CollectionBulkLoaderImpl::commit() { // Commit before deleting dups, so the dups will be removed from secondary indexes when // deleted. if (_secondaryIndexesBlock) { - auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(_opCtx.get(), - _collection->getCollection()); + auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(_opCtx.get(), *coll); if (!status.isOK()) { return status; } // This should always return Status::OK() as the foreground index build doesn't install // an interceptor. - invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(), - _collection->getCollection())); + invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(), *coll)); status = writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] { + _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [&] { WriteUnitOfWork wunit(_opCtx.get()); - auto status = _secondaryIndexesBlock->commit( - _opCtx.get(), - _collection->getWritableCollection(_opCtx.get()), - MultiIndexBlock::kNoopOnCreateEachFn, - MultiIndexBlock::kNoopOnCommitFn); + auto status = + _secondaryIndexesBlock->commit(_opCtx.get(), + coll.getWritableCollection(_opCtx.get()), + MultiIndexBlock::kNoopOnCreateEachFn, + MultiIndexBlock::kNoopOnCommitFn); if (!status.isOK()) { return status; } @@ -271,10 +268,10 @@ Status CollectionBulkLoaderImpl::commit() { if (_idIndexBlock) { // Do not do inside a WriteUnitOfWork (required by dumpInsertsFromBulk). - auto status = _idIndexBlock->dumpInsertsFromBulk( - _opCtx.get(), _collection->getCollection(), [&](const RecordId& rid) { + auto status = + _idIndexBlock->dumpInsertsFromBulk(_opCtx.get(), *coll, [&](const RecordId& rid) { return writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &rid] { + _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [&] { WriteUnitOfWork wunit(_opCtx.get()); // If we were to delete the document after committing the index build, // it's possible that the storage engine unindexes a different record @@ -282,13 +279,12 @@ Status CollectionBulkLoaderImpl::commit() { // before committing the index build, the index removal code uses // 'dupsAllowed', which forces the storage engine to only unindex // records that match the same key and RecordId. - (*_collection) - ->deleteDocument(_opCtx.get(), - kUninitializedStmtId, - rid, - nullptr /** OpDebug **/, - false /* fromMigrate */, - true /* noWarn */); + (*coll)->deleteDocument(_opCtx.get(), + kUninitializedStmtId, + rid, + nullptr /** OpDebug **/, + false /* fromMigrate */, + true /* noWarn */); wunit.commit(); return Status::OK(); }); @@ -300,13 +296,12 @@ Status CollectionBulkLoaderImpl::commit() { // Commit the _id index, there won't be any documents with duplicate _ids as they were // deleted prior to this. status = writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] { + _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [&] { WriteUnitOfWork wunit(_opCtx.get()); - auto status = - _idIndexBlock->commit(_opCtx.get(), - _collection->getWritableCollection(_opCtx.get()), - MultiIndexBlock::kNoopOnCreateEachFn, - MultiIndexBlock::kNoopOnCommitFn); + auto status = _idIndexBlock->commit(_opCtx.get(), + coll.getWritableCollection(_opCtx.get()), + MultiIndexBlock::kNoopOnCreateEachFn, + MultiIndexBlock::kNoopOnCommitFn); if (!status.isOK()) { return status; } @@ -330,34 +325,31 @@ Status CollectionBulkLoaderImpl::commit() { // _releaseResources. _idIndexBlock.reset(); _secondaryIndexesBlock.reset(); - _collection.reset(); return Status::OK(); }); } void CollectionBulkLoaderImpl::_releaseResources() { invariant(&cc() == _opCtx->getClient()); + AutoGetCollection coll(_opCtx.get(), _nss, MODE_X); if (_secondaryIndexesBlock) { - CollectionWriter collWriter(_opCtx.get(), *_collection); + CollectionWriter collWriter(_opCtx.get(), coll); _secondaryIndexesBlock->abortIndexBuild( _opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn); _secondaryIndexesBlock.reset(); } if (_idIndexBlock) { - CollectionWriter collWriter(_opCtx.get(), *_collection); + CollectionWriter collWriter(_opCtx.get(), coll); _idIndexBlock->abortIndexBuild(_opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn); _idIndexBlock.reset(); } - - // release locks. - _collection.reset(); } template <typename F> Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(const F& task) noexcept { AlternativeClientRegion acr(_client); - ScopeGuard guard([this] { _releaseResources(); }); + ScopeGuard guard([&] { _releaseResources(); }); try { const auto status = task(); if (status.isOK()) { @@ -369,12 +361,13 @@ Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(const F& task } } -Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc, +Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const CollectionPtr& collection, + const BSONObj& doc, const RecordId& loc) { if (_idIndexBlock) { auto status = _idIndexBlock->insertSingleDocumentForInitialSyncOrRecovery( _opCtx.get(), - _collection->getCollection(), + collection, doc, loc, // This caller / code path does not have cursors to save/restore. @@ -388,7 +381,7 @@ Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc, if (_secondaryIndexesBlock) { auto status = _secondaryIndexesBlock->insertSingleDocumentForInitialSyncOrRecovery( _opCtx.get(), - _collection->getCollection(), + collection, doc, loc, // This caller / code path does not have cursors to save/restore. diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h index d7077376478..61c2d21d6a8 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.h +++ b/src/mongo/db/repl/collection_bulk_loader_impl.h @@ -62,7 +62,7 @@ public: CollectionBulkLoaderImpl(ServiceContext::UniqueClient&& client, ServiceContext::UniqueOperationContext&& opCtx, - std::unique_ptr<AutoGetCollection>&& autoColl, + const NamespaceString& nss, const BSONObj& idIndexSpec); virtual ~CollectionBulkLoaderImpl(); @@ -86,7 +86,8 @@ private: /** * For capped collections, each document will be inserted in its own WriteUnitOfWork. */ - Status _insertDocumentsForCappedCollection(std::vector<BSONObj>::const_iterator begin, + Status _insertDocumentsForCappedCollection(const CollectionPtr& coll, + std::vector<BSONObj>::const_iterator begin, std::vector<BSONObj>::const_iterator end); /** @@ -94,17 +95,19 @@ private: * collectionBulkLoaderBatchSizeInBytes or up to one document size greater. All insertions in a * given batch will be inserted in one WriteUnitOfWork. */ - Status _insertDocumentsForUncappedCollection(std::vector<BSONObj>::const_iterator begin, + Status _insertDocumentsForUncappedCollection(const CollectionPtr& coll, + std::vector<BSONObj>::const_iterator begin, std::vector<BSONObj>::const_iterator end); /** * Adds document and associated RecordId to index blocks after inserting into RecordStore. */ - Status _addDocumentToIndexBlocks(const BSONObj& doc, const RecordId& loc); + Status _addDocumentToIndexBlocks(const CollectionPtr& coll, + const BSONObj& doc, + const RecordId& loc); ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; - std::unique_ptr<AutoGetCollection> _collection; NamespaceString _nss; std::unique_ptr<MultiIndexBlock> _idIndexBlock; std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock; diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index dad1cbc45df..c357a1b411e 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -232,13 +232,11 @@ StorageInterfaceImpl::createCollectionForBulkLoading( .setFlags(DocumentValidationSettings::kDisableSchemaValidation | DocumentValidationSettings::kDisableInternalValidation); - std::unique_ptr<AutoGetCollection> autoColl; // Retry if WCE. Status status = writeConflictRetry(opCtx.get(), "beginCollectionClone", nss.ns(), [&] { UnreplicatedWritesBlock uwb(opCtx.get()); // Get locks and create the collection. - AutoGetDb autoDb(opCtx.get(), nss.dbName(), MODE_IX); AutoGetCollection coll(opCtx.get(), nss, fixLockModeForSystemDotViewsChanges(nss, MODE_X)); if (coll) { return Status(ErrorCodes::NamespaceExists, @@ -247,35 +245,29 @@ StorageInterfaceImpl::createCollectionForBulkLoading( { // Create the collection. WriteUnitOfWork wunit(opCtx.get()); - auto db = autoDb.ensureDbExists(opCtx.get()); + auto db = coll.ensureDbExists(opCtx.get()); fassert(40332, db->createCollection(opCtx.get(), nss, options, false)); wunit.commit(); } - autoColl = std::make_unique<AutoGetCollection>( - opCtx.get(), nss, fixLockModeForSystemDotViewsChanges(nss, MODE_IX)); - // Build empty capped indexes. Capped indexes cannot be built by the MultiIndexBlock // because the cap might delete documents off the back while we are inserting them into // the front. if (options.capped) { WriteUnitOfWork wunit(opCtx.get()); + // `getWritableCollection` will return the newly created collection even if it didn't + // exist when the AutoGet was created. + auto writableCollection = coll.getWritableCollection(opCtx.get()); if (!idIndexSpec.isEmpty()) { - auto status = - autoColl->getWritableCollection(opCtx.get()) - ->getIndexCatalog() - ->createIndexOnEmptyCollection( - opCtx.get(), autoColl->getWritableCollection(opCtx.get()), idIndexSpec); + auto status = writableCollection->getIndexCatalog()->createIndexOnEmptyCollection( + opCtx.get(), writableCollection, idIndexSpec); if (!status.getStatus().isOK()) { return status.getStatus(); } } for (auto&& spec : secondaryIndexSpecs) { - auto status = - autoColl->getWritableCollection(opCtx.get()) - ->getIndexCatalog() - ->createIndexOnEmptyCollection( - opCtx.get(), autoColl->getWritableCollection(opCtx.get()), spec); + auto status = writableCollection->getIndexCatalog()->createIndexOnEmptyCollection( + opCtx.get(), writableCollection, spec); if (!status.getStatus().isOK()) { return status.getStatus(); } @@ -291,11 +283,8 @@ StorageInterfaceImpl::createCollectionForBulkLoading( } // Move locks into loader, so it now controls their lifetime. - auto loader = - std::make_unique<CollectionBulkLoaderImpl>(Client::releaseCurrent(), - std::move(opCtx), - std::move(autoColl), - options.capped ? BSONObj() : idIndexSpec); + auto loader = std::make_unique<CollectionBulkLoaderImpl>( + Client::releaseCurrent(), std::move(opCtx), nss, options.capped ? BSONObj() : idIndexSpec); status = loader->init(options.capped ? std::vector<BSONObj>() : secondaryIndexSpecs); if (!status.isOK()) { |