diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2016-09-02 09:39:06 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2016-09-06 09:35:53 -0400 |
commit | 815e16eace8c40db7eed5ad3a6902027f1212e2a (patch) | |
tree | 8f47ab82a54dec10710b76fcb6989980d224dab8 /src/mongo/db/repl/collection_bulk_loader_impl.cpp | |
parent | b3aba5d4d12a86e18c13db259f64025e74445d3c (diff) | |
download | mongo-815e16eace8c40db7eed5ad3a6902027f1212e2a.tar.gz |
SERVER-25131: release collection/db locks on collection clone failure.
Diffstat (limited to 'src/mongo/db/repl/collection_bulk_loader_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.cpp | 110 |
1 files changed, 70 insertions, 40 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index cf31132e024..3f8e78d46ca 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -46,6 +46,7 @@ #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { @@ -64,8 +65,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* txn, _txn(txn), _coll(coll), _nss{coll->ns()}, - _idIndexBlock{txn, coll}, - _secondaryIndexesBlock{txn, coll}, + _idIndexBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)), + _secondaryIndexesBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)), _idIndexSpec(idIndexSpec) { invariant(txn); invariant(coll); @@ -91,18 +92,21 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn, invariant(coll); invariant(txn->getClient() == &cc()); if (secondaryIndexSpecs.size()) { - _hasSecondaryIndexes = true; - _secondaryIndexesBlock.ignoreUniqueConstraint(); - auto status = _secondaryIndexesBlock.init(secondaryIndexSpecs); + _secondaryIndexesBlock->ignoreUniqueConstraint(); + auto status = _secondaryIndexesBlock->init(secondaryIndexSpecs); if (!status.isOK()) { return status; } + } else { + _secondaryIndexesBlock.reset(); } if (!_idIndexSpec.isEmpty()) { - auto status = _idIndexBlock.init(_idIndexSpec); + auto status = _idIndexBlock->init(_idIndexSpec); if (!status.isOK()) { return status; } + } else { + _idIndexBlock.reset(); } return Status::OK(); @@ -111,36 +115,37 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn, Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin, const std::vector<BSONObj>::const_iterator end) { int count = 0; - return _runner->runSynchronousTask([begin, end, &count, this](OperationContext* txn) -> Status { - invariant(txn); + return _runTaskReleaseResourcesOnFailure( + [begin, end, &count, this](OperationContext* txn) -> Status { + invariant(txn); - for (auto iter = begin; iter != end; ++iter) { - std::vector<MultiIndexBlock*> indexers; - if (!_idIndexSpec.isEmpty()) { - indexers.push_back(&_idIndexBlock); - } - if (_hasSecondaryIndexes) { - indexers.push_back(&_secondaryIndexesBlock); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(txn); - const auto status = _coll->insertDocument(txn, *iter, indexers, false); - if (!status.isOK()) { - return status; + for (auto iter = begin; iter != end; ++iter) { + std::vector<MultiIndexBlock*> indexers; + if (_idIndexBlock) { + indexers.push_back(_idIndexBlock.get()); } - wunit.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns()); + if (_secondaryIndexesBlock) { + indexers.push_back(_secondaryIndexesBlock.get()); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + const auto status = _coll->insertDocument(txn, *iter, indexers, false); + if (!status.isOK()) { + return status; + } + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns()); - ++count; - } - return Status::OK(); - }); + ++count; + } + return Status::OK(); + }); } Status CollectionBulkLoaderImpl::commit() { - return _runner->runSynchronousTask( + return _runTaskReleaseResourcesOnFailure( [this](OperationContext* txn) -> Status { _stats.startBuildingIndexes = Date_t::now(); LOG(2) << "Creating indexes for ns: " << _nss.ns(); @@ -149,9 +154,9 @@ Status CollectionBulkLoaderImpl::commit() { // Commit before deleting dups, so the dups will be removed from secondary indexes when // deleted. - if (_hasSecondaryIndexes) { + if (_secondaryIndexesBlock) { std::set<RecordId> secDups; - auto status = _secondaryIndexesBlock.doneInserting(&secDups); + auto status = _secondaryIndexesBlock->doneInserting(&secDups); if (!status.isOK()) { return status; } @@ -163,18 +168,18 @@ Status CollectionBulkLoaderImpl::commit() { } MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { WriteUnitOfWork wunit(txn); - _secondaryIndexesBlock.commit(); + _secondaryIndexesBlock->commit(); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( _txn, "CollectionBulkLoaderImpl::commit", _nss.ns()); } - if (!_idIndexSpec.isEmpty()) { + if (_idIndexBlock) { // Delete dups. std::set<RecordId> dups; // Do not do inside a WriteUnitOfWork (required by doneInserting). - auto status = _idIndexBlock.doneInserting(&dups); + auto status = _idIndexBlock->doneInserting(&dups); if (!status.isOK()) { return status; } @@ -196,7 +201,7 @@ Status CollectionBulkLoaderImpl::commit() { // Commit _id index, without dups. MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { WriteUnitOfWork wunit(txn); - _idIndexBlock.commit(); + _idIndexBlock->commit(); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( @@ -206,15 +211,40 @@ Status CollectionBulkLoaderImpl::commit() { LOG(2) << "Done creating indexes for ns: " << _nss.ns() << ", stats: " << _stats.toString(); - // release locks. - _autoColl.reset(nullptr); - _autoDB.reset(nullptr); - _coll = nullptr; + _releaseResources(); return Status::OK(); }, TaskRunner::NextAction::kDisposeOperationContext); } +void CollectionBulkLoaderImpl::_releaseResources() { + if (_secondaryIndexesBlock) { + _secondaryIndexesBlock.reset(); + } + + if (_idIndexBlock) { + _idIndexBlock.reset(); + } + + // release locks. + _coll = nullptr; + _autoColl.reset(nullptr); + _autoDB.reset(nullptr); +} + +Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure( + TaskRunner::SynchronousTask task, TaskRunner::NextAction nextAction) { + auto newTask = [this, &task](OperationContext* txn) -> Status { + ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this); + const auto status = task(txn); + if (status.isOK()) { + guard.Dismiss(); + } + return status; + }; + return _runner->runSynchronousTask(newTask, nextAction); +} + CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const { return _stats; } |