summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2022-09-30 11:26:24 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-30 12:30:57 +0000
commit926591a3d5657eb271b275e6fd49db663b325227 (patch)
tree99f031c6b24335df51d44211509924852e2cd83d
parent459f574b8a4afd9e2e843c625f2ee4b726da12f3 (diff)
downloadmongo-926591a3d5657eb271b275e6fd49db663b325227.tar.gz
SERVER-63833 Refactor CollectionBulkLoaderImpl locks
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp173
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.h13
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp31
3 files changed, 101 insertions, 116 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index d405146eb37..7e311a0b7a4 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -53,17 +53,16 @@ namespace repl {
CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(ServiceContext::UniqueClient&& client,
ServiceContext::UniqueOperationContext&& opCtx,
- std::unique_ptr<AutoGetCollection>&& autoColl,
+ const NamespaceString& nss,
const BSONObj& idIndexSpec)
: _client{std::move(client)},
_opCtx{std::move(opCtx)},
- _collection{std::move(autoColl)},
- _nss{_collection->getCollection()->ns()},
+ _nss{nss},
_idIndexBlock(std::make_unique<MultiIndexBlock>()),
_secondaryIndexesBlock(std::make_unique<MultiIndexBlock>()),
_idIndexSpec(idIndexSpec.getOwned()) {
invariant(_opCtx);
- invariant(_collection);
+ invariant(!_nss.isEmpty());
}
CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() {
@@ -72,7 +71,8 @@ CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() {
}
Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndexSpecs) {
- return _runTaskReleaseResourcesOnFailure([&secondaryIndexSpecs, this]() -> Status {
+ return _runTaskReleaseResourcesOnFailure([&]() -> Status {
+ AutoGetCollection coll(_opCtx.get(), _nss, MODE_X);
// This method is called during initial sync of a replica set member, so we can safely tell
// the index builders to build in the foreground instead of using the hybrid approach. The
// member won't be available to be queried by anyone until it's caught up with the primary.
@@ -80,57 +80,52 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex
// locks as yielding a MODE_X/MODE_S lock isn't allowed.
_secondaryIndexesBlock->setIndexBuildMethod(IndexBuildMethod::kForeground);
_idIndexBlock->setIndexBuildMethod(IndexBuildMethod::kForeground);
- return writeConflictRetry(
- _opCtx.get(),
- "CollectionBulkLoader::init",
- _collection->getNss().ns(),
- [&secondaryIndexSpecs, this] {
- WriteUnitOfWork wuow(_opCtx.get());
- // All writes in CollectionBulkLoaderImpl should be unreplicated.
- // The opCtx is accessed indirectly through _secondaryIndexesBlock.
- UnreplicatedWritesBlock uwb(_opCtx.get());
- // This enforces the buildIndexes setting in the replica set configuration.
- CollectionWriter collWriter(_opCtx.get(), *_collection);
- auto indexCatalog =
- collWriter.getWritableCollection(_opCtx.get())->getIndexCatalog();
- auto specs = indexCatalog->removeExistingIndexesNoChecks(
- _opCtx.get(), collWriter.get(), secondaryIndexSpecs);
- if (specs.size()) {
- _secondaryIndexesBlock->ignoreUniqueConstraint();
- auto status = _secondaryIndexesBlock
- ->init(_opCtx.get(),
- collWriter,
- specs,
- MultiIndexBlock::kNoopOnInitFn,
- /*forRecovery=*/false)
- .getStatus();
- if (!status.isOK()) {
- return status;
- }
- } else {
- _secondaryIndexesBlock.reset();
+ return writeConflictRetry(_opCtx.get(), "CollectionBulkLoader::init", _nss.ns(), [&] {
+ WriteUnitOfWork wuow(_opCtx.get());
+ // All writes in CollectionBulkLoaderImpl should be unreplicated.
+ // The opCtx is accessed indirectly through _secondaryIndexesBlock.
+ UnreplicatedWritesBlock uwb(_opCtx.get());
+ // This enforces the buildIndexes setting in the replica set configuration.
+ CollectionWriter collWriter(_opCtx.get(), coll);
+ auto indexCatalog = collWriter.getWritableCollection(_opCtx.get())->getIndexCatalog();
+ auto specs = indexCatalog->removeExistingIndexesNoChecks(
+ _opCtx.get(), collWriter.get(), secondaryIndexSpecs);
+ if (specs.size()) {
+ _secondaryIndexesBlock->ignoreUniqueConstraint();
+ auto status = _secondaryIndexesBlock
+ ->init(_opCtx.get(),
+ collWriter,
+ specs,
+ MultiIndexBlock::kNoopOnInitFn,
+ /*forRecovery=*/false)
+ .getStatus();
+ if (!status.isOK()) {
+ return status;
}
- if (!_idIndexSpec.isEmpty()) {
- auto status = _idIndexBlock
- ->init(_opCtx.get(),
- collWriter,
- _idIndexSpec,
- MultiIndexBlock::kNoopOnInitFn)
- .getStatus();
- if (!status.isOK()) {
- return status;
- }
- } else {
- _idIndexBlock.reset();
+ } else {
+ _secondaryIndexesBlock.reset();
+ }
+ if (!_idIndexSpec.isEmpty()) {
+ auto status =
+ _idIndexBlock
+ ->init(
+ _opCtx.get(), collWriter, _idIndexSpec, MultiIndexBlock::kNoopOnInitFn)
+ .getStatus();
+ if (!status.isOK()) {
+ return status;
}
+ } else {
+ _idIndexBlock.reset();
+ }
- wuow.commit();
- return Status::OK();
- });
+ wuow.commit();
+ return Status::OK();
+ });
});
}
Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection(
+ const CollectionPtr& coll,
const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) {
auto iter = begin;
@@ -153,7 +148,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection(
bytesInBlock += doc.objsize();
// This version of insert will not update any indexes.
const auto status = collection_internal::insertDocumentForBulkLoader(
- _opCtx.get(), **_collection, doc, onRecordInserted);
+ _opCtx.get(), coll, doc, onRecordInserted);
if (!status.isOK()) {
return status;
}
@@ -173,7 +168,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection(
status = writeConflictRetry(_opCtx.get(), "_addDocumentToIndexBlocks", _nss.ns(), [&] {
WriteUnitOfWork wunit(_opCtx.get());
for (size_t index = 0; index < locs.size(); ++index) {
- status = _addDocumentToIndexBlocks(*iter++, locs.at(index));
+ status = _addDocumentToIndexBlocks(coll, *iter++, locs.at(index));
if (!status.isOK()) {
return status;
}
@@ -190,6 +185,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection(
}
Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection(
+ const CollectionPtr& coll,
const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) {
for (auto iter = begin; iter != end; ++iter) {
@@ -200,7 +196,7 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection(
// For capped collections, we use regular insertDocument, which
// will update pre-existing indexes.
const auto status = collection_internal::insertDocument(
- _opCtx.get(), **_collection, InsertStatement(doc), nullptr);
+ _opCtx.get(), coll, InsertStatement(doc), nullptr);
if (!status.isOK()) {
return status;
}
@@ -217,17 +213,20 @@ Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection(
Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) {
return _runTaskReleaseResourcesOnFailure([&] {
+ AutoGetCollection coll(
+ _opCtx.get(), _nss, fixLockModeForSystemDotViewsChanges(_nss, MODE_IX));
UnreplicatedWritesBlock uwb(_opCtx.get());
if (_idIndexBlock || _secondaryIndexesBlock) {
- return _insertDocumentsForUncappedCollection(begin, end);
+ return _insertDocumentsForUncappedCollection(*coll, begin, end);
} else {
- return _insertDocumentsForCappedCollection(begin, end);
+ return _insertDocumentsForCappedCollection(*coll, begin, end);
}
});
}
Status CollectionBulkLoaderImpl::commit() {
return _runTaskReleaseResourcesOnFailure([&] {
+ AutoGetCollection coll(_opCtx.get(), _nss, MODE_X);
_stats.startBuildingIndexes = Date_t::now();
LOGV2_DEBUG(21130,
2,
@@ -239,25 +238,23 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit before deleting dups, so the dups will be removed from secondary indexes when
// deleted.
if (_secondaryIndexesBlock) {
- auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(_opCtx.get(),
- _collection->getCollection());
+ auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(_opCtx.get(), *coll);
if (!status.isOK()) {
return status;
}
// This should always return Status::OK() as the foreground index build doesn't install
// an interceptor.
- invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(),
- _collection->getCollection()));
+ invariant(_secondaryIndexesBlock->checkConstraints(_opCtx.get(), *coll));
status = writeConflictRetry(
- _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] {
+ _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [&] {
WriteUnitOfWork wunit(_opCtx.get());
- auto status = _secondaryIndexesBlock->commit(
- _opCtx.get(),
- _collection->getWritableCollection(_opCtx.get()),
- MultiIndexBlock::kNoopOnCreateEachFn,
- MultiIndexBlock::kNoopOnCommitFn);
+ auto status =
+ _secondaryIndexesBlock->commit(_opCtx.get(),
+ coll.getWritableCollection(_opCtx.get()),
+ MultiIndexBlock::kNoopOnCreateEachFn,
+ MultiIndexBlock::kNoopOnCommitFn);
if (!status.isOK()) {
return status;
}
@@ -271,10 +268,10 @@ Status CollectionBulkLoaderImpl::commit() {
if (_idIndexBlock) {
// Do not do inside a WriteUnitOfWork (required by dumpInsertsFromBulk).
- auto status = _idIndexBlock->dumpInsertsFromBulk(
- _opCtx.get(), _collection->getCollection(), [&](const RecordId& rid) {
+ auto status =
+ _idIndexBlock->dumpInsertsFromBulk(_opCtx.get(), *coll, [&](const RecordId& rid) {
return writeConflictRetry(
- _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this, &rid] {
+ _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [&] {
WriteUnitOfWork wunit(_opCtx.get());
// If we were to delete the document after committing the index build,
// it's possible that the storage engine unindexes a different record
@@ -282,13 +279,12 @@ Status CollectionBulkLoaderImpl::commit() {
// before committing the index build, the index removal code uses
// 'dupsAllowed', which forces the storage engine to only unindex
// records that match the same key and RecordId.
- (*_collection)
- ->deleteDocument(_opCtx.get(),
- kUninitializedStmtId,
- rid,
- nullptr /** OpDebug **/,
- false /* fromMigrate */,
- true /* noWarn */);
+ (*coll)->deleteDocument(_opCtx.get(),
+ kUninitializedStmtId,
+ rid,
+ nullptr /** OpDebug **/,
+ false /* fromMigrate */,
+ true /* noWarn */);
wunit.commit();
return Status::OK();
});
@@ -300,13 +296,12 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit the _id index, there won't be any documents with duplicate _ids as they were
// deleted prior to this.
status = writeConflictRetry(
- _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [this] {
+ _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns(), [&] {
WriteUnitOfWork wunit(_opCtx.get());
- auto status =
- _idIndexBlock->commit(_opCtx.get(),
- _collection->getWritableCollection(_opCtx.get()),
- MultiIndexBlock::kNoopOnCreateEachFn,
- MultiIndexBlock::kNoopOnCommitFn);
+ auto status = _idIndexBlock->commit(_opCtx.get(),
+ coll.getWritableCollection(_opCtx.get()),
+ MultiIndexBlock::kNoopOnCreateEachFn,
+ MultiIndexBlock::kNoopOnCommitFn);
if (!status.isOK()) {
return status;
}
@@ -330,34 +325,31 @@ Status CollectionBulkLoaderImpl::commit() {
// _releaseResources.
_idIndexBlock.reset();
_secondaryIndexesBlock.reset();
- _collection.reset();
return Status::OK();
});
}
void CollectionBulkLoaderImpl::_releaseResources() {
invariant(&cc() == _opCtx->getClient());
+ AutoGetCollection coll(_opCtx.get(), _nss, MODE_X);
if (_secondaryIndexesBlock) {
- CollectionWriter collWriter(_opCtx.get(), *_collection);
+ CollectionWriter collWriter(_opCtx.get(), coll);
_secondaryIndexesBlock->abortIndexBuild(
_opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn);
_secondaryIndexesBlock.reset();
}
if (_idIndexBlock) {
- CollectionWriter collWriter(_opCtx.get(), *_collection);
+ CollectionWriter collWriter(_opCtx.get(), coll);
_idIndexBlock->abortIndexBuild(_opCtx.get(), collWriter, MultiIndexBlock::kNoopOnCleanUpFn);
_idIndexBlock.reset();
}
-
- // release locks.
- _collection.reset();
}
template <typename F>
Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(const F& task) noexcept {
AlternativeClientRegion acr(_client);
- ScopeGuard guard([this] { _releaseResources(); });
+ ScopeGuard guard([&] { _releaseResources(); });
try {
const auto status = task();
if (status.isOK()) {
@@ -369,12 +361,13 @@ Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(const F& task
}
}
-Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc,
+Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const CollectionPtr& collection,
+ const BSONObj& doc,
const RecordId& loc) {
if (_idIndexBlock) {
auto status = _idIndexBlock->insertSingleDocumentForInitialSyncOrRecovery(
_opCtx.get(),
- _collection->getCollection(),
+ collection,
doc,
loc,
// This caller / code path does not have cursors to save/restore.
@@ -388,7 +381,7 @@ Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc,
if (_secondaryIndexesBlock) {
auto status = _secondaryIndexesBlock->insertSingleDocumentForInitialSyncOrRecovery(
_opCtx.get(),
- _collection->getCollection(),
+ collection,
doc,
loc,
// This caller / code path does not have cursors to save/restore.
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h
index d7077376478..61c2d21d6a8 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.h
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.h
@@ -62,7 +62,7 @@ public:
CollectionBulkLoaderImpl(ServiceContext::UniqueClient&& client,
ServiceContext::UniqueOperationContext&& opCtx,
- std::unique_ptr<AutoGetCollection>&& autoColl,
+ const NamespaceString& nss,
const BSONObj& idIndexSpec);
virtual ~CollectionBulkLoaderImpl();
@@ -86,7 +86,8 @@ private:
/**
* For capped collections, each document will be inserted in its own WriteUnitOfWork.
*/
- Status _insertDocumentsForCappedCollection(std::vector<BSONObj>::const_iterator begin,
+ Status _insertDocumentsForCappedCollection(const CollectionPtr& coll,
+ std::vector<BSONObj>::const_iterator begin,
std::vector<BSONObj>::const_iterator end);
/**
@@ -94,17 +95,19 @@ private:
* collectionBulkLoaderBatchSizeInBytes or up to one document size greater. All insertions in a
* given batch will be inserted in one WriteUnitOfWork.
*/
- Status _insertDocumentsForUncappedCollection(std::vector<BSONObj>::const_iterator begin,
+ Status _insertDocumentsForUncappedCollection(const CollectionPtr& coll,
+ std::vector<BSONObj>::const_iterator begin,
std::vector<BSONObj>::const_iterator end);
/**
* Adds document and associated RecordId to index blocks after inserting into RecordStore.
*/
- Status _addDocumentToIndexBlocks(const BSONObj& doc, const RecordId& loc);
+ Status _addDocumentToIndexBlocks(const CollectionPtr& coll,
+ const BSONObj& doc,
+ const RecordId& loc);
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;
- std::unique_ptr<AutoGetCollection> _collection;
NamespaceString _nss;
std::unique_ptr<MultiIndexBlock> _idIndexBlock;
std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock;
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index dad1cbc45df..c357a1b411e 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -232,13 +232,11 @@ StorageInterfaceImpl::createCollectionForBulkLoading(
.setFlags(DocumentValidationSettings::kDisableSchemaValidation |
DocumentValidationSettings::kDisableInternalValidation);
- std::unique_ptr<AutoGetCollection> autoColl;
// Retry if WCE.
Status status = writeConflictRetry(opCtx.get(), "beginCollectionClone", nss.ns(), [&] {
UnreplicatedWritesBlock uwb(opCtx.get());
// Get locks and create the collection.
- AutoGetDb autoDb(opCtx.get(), nss.dbName(), MODE_IX);
AutoGetCollection coll(opCtx.get(), nss, fixLockModeForSystemDotViewsChanges(nss, MODE_X));
if (coll) {
return Status(ErrorCodes::NamespaceExists,
@@ -247,35 +245,29 @@ StorageInterfaceImpl::createCollectionForBulkLoading(
{
// Create the collection.
WriteUnitOfWork wunit(opCtx.get());
- auto db = autoDb.ensureDbExists(opCtx.get());
+ auto db = coll.ensureDbExists(opCtx.get());
fassert(40332, db->createCollection(opCtx.get(), nss, options, false));
wunit.commit();
}
- autoColl = std::make_unique<AutoGetCollection>(
- opCtx.get(), nss, fixLockModeForSystemDotViewsChanges(nss, MODE_IX));
-
// Build empty capped indexes. Capped indexes cannot be built by the MultiIndexBlock
// because the cap might delete documents off the back while we are inserting them into
// the front.
if (options.capped) {
WriteUnitOfWork wunit(opCtx.get());
+ // `getWritableCollection` will return the newly created collection even if it didn't
+ // exist when the AutoGet was created.
+ auto writableCollection = coll.getWritableCollection(opCtx.get());
if (!idIndexSpec.isEmpty()) {
- auto status =
- autoColl->getWritableCollection(opCtx.get())
- ->getIndexCatalog()
- ->createIndexOnEmptyCollection(
- opCtx.get(), autoColl->getWritableCollection(opCtx.get()), idIndexSpec);
+ auto status = writableCollection->getIndexCatalog()->createIndexOnEmptyCollection(
+ opCtx.get(), writableCollection, idIndexSpec);
if (!status.getStatus().isOK()) {
return status.getStatus();
}
}
for (auto&& spec : secondaryIndexSpecs) {
- auto status =
- autoColl->getWritableCollection(opCtx.get())
- ->getIndexCatalog()
- ->createIndexOnEmptyCollection(
- opCtx.get(), autoColl->getWritableCollection(opCtx.get()), spec);
+ auto status = writableCollection->getIndexCatalog()->createIndexOnEmptyCollection(
+ opCtx.get(), writableCollection, spec);
if (!status.getStatus().isOK()) {
return status.getStatus();
}
@@ -291,11 +283,8 @@ StorageInterfaceImpl::createCollectionForBulkLoading(
}
// Move locks into loader, so it now controls their lifetime.
- auto loader =
- std::make_unique<CollectionBulkLoaderImpl>(Client::releaseCurrent(),
- std::move(opCtx),
- std::move(autoColl),
- options.capped ? BSONObj() : idIndexSpec);
+ auto loader = std::make_unique<CollectionBulkLoaderImpl>(
+ Client::releaseCurrent(), std::move(opCtx), nss, options.capped ? BSONObj() : idIndexSpec);
status = loader->init(options.capped ? std::vector<BSONObj>() : secondaryIndexSpecs);
if (!status.isOK()) {