summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_bulk_loader_impl.cpp
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2016-09-02 09:39:06 -0400
committerScott Hernandez <scotthernandez@gmail.com>2016-09-06 09:35:53 -0400
commit815e16eace8c40db7eed5ad3a6902027f1212e2a (patch)
tree8f47ab82a54dec10710b76fcb6989980d224dab8 /src/mongo/db/repl/collection_bulk_loader_impl.cpp
parentb3aba5d4d12a86e18c13db259f64025e74445d3c (diff)
downloadmongo-815e16eace8c40db7eed5ad3a6902027f1212e2a.tar.gz
SERVER-25131: release collection/db locks on collection clone failure.
Diffstat (limited to 'src/mongo/db/repl/collection_bulk_loader_impl.cpp')
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp110
1 files changed, 70 insertions, 40 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index cf31132e024..3f8e78d46ca 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -46,6 +46,7 @@
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
@@ -64,8 +65,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* txn,
_txn(txn),
_coll(coll),
_nss{coll->ns()},
- _idIndexBlock{txn, coll},
- _secondaryIndexesBlock{txn, coll},
+ _idIndexBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)),
+ _secondaryIndexesBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)),
_idIndexSpec(idIndexSpec) {
invariant(txn);
invariant(coll);
@@ -91,18 +92,21 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn,
invariant(coll);
invariant(txn->getClient() == &cc());
if (secondaryIndexSpecs.size()) {
- _hasSecondaryIndexes = true;
- _secondaryIndexesBlock.ignoreUniqueConstraint();
- auto status = _secondaryIndexesBlock.init(secondaryIndexSpecs);
+ _secondaryIndexesBlock->ignoreUniqueConstraint();
+ auto status = _secondaryIndexesBlock->init(secondaryIndexSpecs);
if (!status.isOK()) {
return status;
}
+ } else {
+ _secondaryIndexesBlock.reset();
}
if (!_idIndexSpec.isEmpty()) {
- auto status = _idIndexBlock.init(_idIndexSpec);
+ auto status = _idIndexBlock->init(_idIndexSpec);
if (!status.isOK()) {
return status;
}
+ } else {
+ _idIndexBlock.reset();
}
return Status::OK();
@@ -111,36 +115,37 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn,
Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) {
int count = 0;
- return _runner->runSynchronousTask([begin, end, &count, this](OperationContext* txn) -> Status {
- invariant(txn);
+ return _runTaskReleaseResourcesOnFailure(
+ [begin, end, &count, this](OperationContext* txn) -> Status {
+ invariant(txn);
- for (auto iter = begin; iter != end; ++iter) {
- std::vector<MultiIndexBlock*> indexers;
- if (!_idIndexSpec.isEmpty()) {
- indexers.push_back(&_idIndexBlock);
- }
- if (_hasSecondaryIndexes) {
- indexers.push_back(&_secondaryIndexesBlock);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wunit(txn);
- const auto status = _coll->insertDocument(txn, *iter, indexers, false);
- if (!status.isOK()) {
- return status;
+ for (auto iter = begin; iter != end; ++iter) {
+ std::vector<MultiIndexBlock*> indexers;
+ if (_idIndexBlock) {
+ indexers.push_back(_idIndexBlock.get());
}
- wunit.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
+ if (_secondaryIndexesBlock) {
+ indexers.push_back(_secondaryIndexesBlock.get());
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
+ const auto status = _coll->insertDocument(txn, *iter, indexers, false);
+ if (!status.isOK()) {
+ return status;
+ }
+ wunit.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
- ++count;
- }
- return Status::OK();
- });
+ ++count;
+ }
+ return Status::OK();
+ });
}
Status CollectionBulkLoaderImpl::commit() {
- return _runner->runSynchronousTask(
+ return _runTaskReleaseResourcesOnFailure(
[this](OperationContext* txn) -> Status {
_stats.startBuildingIndexes = Date_t::now();
LOG(2) << "Creating indexes for ns: " << _nss.ns();
@@ -149,9 +154,9 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit before deleting dups, so the dups will be removed from secondary indexes when
// deleted.
- if (_hasSecondaryIndexes) {
+ if (_secondaryIndexesBlock) {
std::set<RecordId> secDups;
- auto status = _secondaryIndexesBlock.doneInserting(&secDups);
+ auto status = _secondaryIndexesBlock->doneInserting(&secDups);
if (!status.isOK()) {
return status;
}
@@ -163,18 +168,18 @@ Status CollectionBulkLoaderImpl::commit() {
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
- _secondaryIndexesBlock.commit();
+ _secondaryIndexesBlock->commit();
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
_txn, "CollectionBulkLoaderImpl::commit", _nss.ns());
}
- if (!_idIndexSpec.isEmpty()) {
+ if (_idIndexBlock) {
// Delete dups.
std::set<RecordId> dups;
// Do not do inside a WriteUnitOfWork (required by doneInserting).
- auto status = _idIndexBlock.doneInserting(&dups);
+ auto status = _idIndexBlock->doneInserting(&dups);
if (!status.isOK()) {
return status;
}
@@ -196,7 +201,7 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit _id index, without dups.
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
- _idIndexBlock.commit();
+ _idIndexBlock->commit();
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
@@ -206,15 +211,40 @@ Status CollectionBulkLoaderImpl::commit() {
LOG(2) << "Done creating indexes for ns: " << _nss.ns()
<< ", stats: " << _stats.toString();
- // release locks.
- _autoColl.reset(nullptr);
- _autoDB.reset(nullptr);
- _coll = nullptr;
+ _releaseResources();
return Status::OK();
},
TaskRunner::NextAction::kDisposeOperationContext);
}
+void CollectionBulkLoaderImpl::_releaseResources() {
+ if (_secondaryIndexesBlock) {
+ _secondaryIndexesBlock.reset();
+ }
+
+ if (_idIndexBlock) {
+ _idIndexBlock.reset();
+ }
+
+ // release locks.
+ _coll = nullptr;
+ _autoColl.reset(nullptr);
+ _autoDB.reset(nullptr);
+}
+
+Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(
+ TaskRunner::SynchronousTask task, TaskRunner::NextAction nextAction) {
+ auto newTask = [this, &task](OperationContext* txn) -> Status {
+ ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this);
+ const auto status = task(txn);
+ if (status.isOK()) {
+ guard.Dismiss();
+ }
+ return status;
+ };
+ return _runner->runSynchronousTask(newTask, nextAction);
+}
+
CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const {
return _stats;
}