summaryrefslogtreecommitdiff
path: root/src/mongo/db/catalog/index_create.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/catalog/index_create.cpp')
-rw-r--r--src/mongo/db/catalog/index_create.cpp540
1 files changed, 262 insertions, 278 deletions
diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp
index b624c0151f0..c642fcb83a5 100644
--- a/src/mongo/db/catalog/index_create.cpp
+++ b/src/mongo/db/catalog/index_create.cpp
@@ -54,334 +54,318 @@
namespace mongo {
- using std::unique_ptr;
- using std::string;
- using std::endl;
-
- /**
- * On rollback sets MultiIndexBlock::_needToCleanup to true.
- */
- class MultiIndexBlock::SetNeedToCleanupOnRollback : public RecoveryUnit::Change {
- public:
- explicit SetNeedToCleanupOnRollback(MultiIndexBlock* indexer) : _indexer(indexer) {}
-
- virtual void commit() {}
- virtual void rollback() { _indexer->_needToCleanup = true; }
-
- private:
- MultiIndexBlock* const _indexer;
- };
-
- /**
- * On rollback in init(), cleans up _indexes so that ~MultiIndexBlock doesn't try to clean
- * up _indexes manually (since the changes were already rolled back).
- * Due to this, it is thus legal to call init() again after it fails.
- */
- class MultiIndexBlock::CleanupIndexesVectorOnRollback : public RecoveryUnit::Change {
- public:
- explicit CleanupIndexesVectorOnRollback(MultiIndexBlock* indexer) : _indexer(indexer) {}
-
- virtual void commit() {}
- virtual void rollback() { _indexer->_indexes.clear(); }
-
- private:
- MultiIndexBlock* const _indexer;
- };
-
- MultiIndexBlock::MultiIndexBlock(OperationContext* txn, Collection* collection)
- : _collection(collection),
- _txn(txn),
- _buildInBackground(false),
- _allowInterruption(false),
- _ignoreUnique(false),
- _needToCleanup(true) {
+using std::unique_ptr;
+using std::string;
+using std::endl;
+
+/**
+ * On rollback sets MultiIndexBlock::_needToCleanup to true.
+ */
+class MultiIndexBlock::SetNeedToCleanupOnRollback : public RecoveryUnit::Change {
+public:
+ explicit SetNeedToCleanupOnRollback(MultiIndexBlock* indexer) : _indexer(indexer) {}
+
+ virtual void commit() {}
+ virtual void rollback() {
+ _indexer->_needToCleanup = true;
}
- MultiIndexBlock::~MultiIndexBlock() {
- if (!_needToCleanup || _indexes.empty())
- return;
- while (true) {
- try {
- WriteUnitOfWork wunit(_txn);
- // This cleans up all index builds.
- // Because that may need to write, it is done inside
- // of a WUOW. Nothing inside this block can fail, and it is made fatal if it does.
- for (size_t i = 0; i < _indexes.size(); i++) {
- _indexes[i].block->fail();
- }
- wunit.commit();
- return;
- }
- catch (const WriteConflictException& e) {
- continue;
- }
- catch (const std::exception& e) {
- error() << "Caught exception while cleaning up partially built indexes: "
- << e.what();
- }
- catch (...) {
- error() << "Caught unknown exception while cleaning up partially built indexes.";
- }
- fassertFailed(18644);
- }
+private:
+ MultiIndexBlock* const _indexer;
+};
+
+/**
+ * On rollback in init(), cleans up _indexes so that ~MultiIndexBlock doesn't try to clean
+ * up _indexes manually (since the changes were already rolled back).
+ * Due to this, it is thus legal to call init() again after it fails.
+ */
+class MultiIndexBlock::CleanupIndexesVectorOnRollback : public RecoveryUnit::Change {
+public:
+ explicit CleanupIndexesVectorOnRollback(MultiIndexBlock* indexer) : _indexer(indexer) {}
+
+ virtual void commit() {}
+ virtual void rollback() {
+ _indexer->_indexes.clear();
}
- void MultiIndexBlock::removeExistingIndexes(std::vector<BSONObj>* specs) const {
- for (size_t i = 0; i < specs->size(); i++) {
- Status status =
- _collection->getIndexCatalog()->prepareSpecForCreate(_txn, (*specs)[i]).getStatus();
- if (status.code() == ErrorCodes::IndexAlreadyExists) {
- specs->erase(specs->begin() + i);
- i--;
+private:
+ MultiIndexBlock* const _indexer;
+};
+
+MultiIndexBlock::MultiIndexBlock(OperationContext* txn, Collection* collection)
+ : _collection(collection),
+ _txn(txn),
+ _buildInBackground(false),
+ _allowInterruption(false),
+ _ignoreUnique(false),
+ _needToCleanup(true) {}
+
+MultiIndexBlock::~MultiIndexBlock() {
+ if (!_needToCleanup || _indexes.empty())
+ return;
+ while (true) {
+ try {
+ WriteUnitOfWork wunit(_txn);
+ // This cleans up all index builds.
+ // Because that may need to write, it is done inside
+ // of a WUOW. Nothing inside this block can fail, and it is made fatal if it does.
+ for (size_t i = 0; i < _indexes.size(); i++) {
+ _indexes[i].block->fail();
}
- // intentionally ignoring other error codes
+ wunit.commit();
+ return;
+ } catch (const WriteConflictException& e) {
+ continue;
+ } catch (const std::exception& e) {
+ error() << "Caught exception while cleaning up partially built indexes: " << e.what();
+ } catch (...) {
+ error() << "Caught unknown exception while cleaning up partially built indexes.";
}
+ fassertFailed(18644);
}
+}
+
+void MultiIndexBlock::removeExistingIndexes(std::vector<BSONObj>* specs) const {
+ for (size_t i = 0; i < specs->size(); i++) {
+ Status status =
+ _collection->getIndexCatalog()->prepareSpecForCreate(_txn, (*specs)[i]).getStatus();
+ if (status.code() == ErrorCodes::IndexAlreadyExists) {
+ specs->erase(specs->begin() + i);
+ i--;
+ }
+ // intentionally ignoring other error codes
+ }
+}
- Status MultiIndexBlock::init(const std::vector<BSONObj>& indexSpecs) {
- WriteUnitOfWork wunit(_txn);
-
- invariant(_indexes.empty());
- _txn->recoveryUnit()->registerChange(new CleanupIndexesVectorOnRollback(this));
+Status MultiIndexBlock::init(const std::vector<BSONObj>& indexSpecs) {
+ WriteUnitOfWork wunit(_txn);
- const string& ns = _collection->ns().ns();
+ invariant(_indexes.empty());
+ _txn->recoveryUnit()->registerChange(new CleanupIndexesVectorOnRollback(this));
- Status status = _collection->getIndexCatalog()->checkUnfinished();
- if ( !status.isOK() )
- return status;
+ const string& ns = _collection->ns().ns();
- for ( size_t i = 0; i < indexSpecs.size(); i++ ) {
- BSONObj info = indexSpecs[i];
+ Status status = _collection->getIndexCatalog()->checkUnfinished();
+ if (!status.isOK())
+ return status;
- string pluginName = IndexNames::findPluginName( info["key"].Obj() );
- if ( pluginName.size() ) {
- Status s =
- _collection->getIndexCatalog()->_upgradeDatabaseMinorVersionIfNeeded(_txn, pluginName);
- if ( !s.isOK() )
- return s;
- }
+ for (size_t i = 0; i < indexSpecs.size(); i++) {
+ BSONObj info = indexSpecs[i];
- // Any foreground indexes make all indexes be built in the foreground.
- _buildInBackground = (_buildInBackground && info["background"].trueValue());
+ string pluginName = IndexNames::findPluginName(info["key"].Obj());
+ if (pluginName.size()) {
+ Status s = _collection->getIndexCatalog()->_upgradeDatabaseMinorVersionIfNeeded(
+ _txn, pluginName);
+ if (!s.isOK())
+ return s;
}
- for ( size_t i = 0; i < indexSpecs.size(); i++ ) {
- BSONObj info = indexSpecs[i];
- StatusWith<BSONObj> statusWithInfo =
- _collection->getIndexCatalog()->prepareSpecForCreate( _txn, info );
- Status status = statusWithInfo.getStatus();
- if ( !status.isOK() )
- return status;
- info = statusWithInfo.getValue();
-
- IndexToBuild index;
- index.block.reset(new IndexCatalog::IndexBuildBlock(_txn, _collection, info));
- status = index.block->init();
- if ( !status.isOK() )
- return status;
-
- index.real = index.block->getEntry()->accessMethod();
- status = index.real->initializeAsEmpty(_txn);
- if ( !status.isOK() )
- return status;
-
- if (!_buildInBackground) {
- // Bulk build process requires foreground building as it assumes nothing is changing
- // under it.
- index.bulk = index.real->initiateBulk();
- }
+ // Any foreground indexes make all indexes be built in the foreground.
+ _buildInBackground = (_buildInBackground && info["background"].trueValue());
+ }
+
+ for (size_t i = 0; i < indexSpecs.size(); i++) {
+ BSONObj info = indexSpecs[i];
+ StatusWith<BSONObj> statusWithInfo =
+ _collection->getIndexCatalog()->prepareSpecForCreate(_txn, info);
+ Status status = statusWithInfo.getStatus();
+ if (!status.isOK())
+ return status;
+ info = statusWithInfo.getValue();
- const IndexDescriptor* descriptor = index.block->getEntry()->descriptor();
+ IndexToBuild index;
+ index.block.reset(new IndexCatalog::IndexBuildBlock(_txn, _collection, info));
+ status = index.block->init();
+ if (!status.isOK())
+ return status;
- index.options.logIfError = false; // logging happens elsewhere if needed.
- index.options.dupsAllowed = !descriptor->unique()
- || _ignoreUnique
- || repl::getGlobalReplicationCoordinator()
- ->shouldIgnoreUniqueIndex(descriptor);
+ index.real = index.block->getEntry()->accessMethod();
+ status = index.real->initializeAsEmpty(_txn);
+ if (!status.isOK())
+ return status;
- log() << "build index on: " << ns << " properties: " << descriptor->toString();
- if (index.bulk)
- log() << "\t building index using bulk method";
+ if (!_buildInBackground) {
+ // Bulk build process requires foreground building as it assumes nothing is changing
+ // under it.
+ index.bulk = index.real->initiateBulk();
+ }
- index.filterExpression = index.block->getEntry()->getFilterExpression();
+ const IndexDescriptor* descriptor = index.block->getEntry()->descriptor();
- // TODO SERVER-14888 Suppress this in cases we don't want to audit.
- audit::logCreateIndex(_txn->getClient(), &info, descriptor->indexName(), ns);
+ index.options.logIfError = false; // logging happens elsewhere if needed.
+ index.options.dupsAllowed = !descriptor->unique() || _ignoreUnique ||
+ repl::getGlobalReplicationCoordinator()->shouldIgnoreUniqueIndex(descriptor);
- _indexes.push_back(std::move(index));
- }
+ log() << "build index on: " << ns << " properties: " << descriptor->toString();
+ if (index.bulk)
+ log() << "\t building index using bulk method";
- // this is so that operations examining the list of indexes know there are more keys to look
- // at when doing things like in place updates, etc...
- _collection->infoCache()->addedIndex(_txn);
+ index.filterExpression = index.block->getEntry()->getFilterExpression();
- if (_buildInBackground)
- _backgroundOperation.reset(new BackgroundOperation(ns));
+ // TODO SERVER-14888 Suppress this in cases we don't want to audit.
+ audit::logCreateIndex(_txn->getClient(), &info, descriptor->indexName(), ns);
- wunit.commit();
- return Status::OK();
+ _indexes.push_back(std::move(index));
}
- Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsOut) {
- const char* curopMessage = _buildInBackground ? "Index Build (background)" : "Index Build";
- const auto numRecords = _collection->numRecords(_txn);
- stdx::unique_lock<Client> lk(*_txn->getClient());
- ProgressMeterHolder progress(*_txn->setMessage_inlock(curopMessage,
- curopMessage,
- numRecords));
- lk.unlock();
-
- Timer t;
-
- unsigned long long n = 0;
-
- unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(_txn,
- _collection->ns().ns(),
- _collection));
- if (_buildInBackground) {
- invariant(_allowInterruption);
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
- }
- else {
- exec->setYieldPolicy(PlanExecutor::WRITE_CONFLICT_RETRY_ONLY);
- }
+ // this is so that operations examining the list of indexes know there are more keys to look
+ // at when doing things like in place updates, etc...
+ _collection->infoCache()->addedIndex(_txn);
- Snapshotted<BSONObj> objToIndex;
- RecordId loc;
- PlanExecutor::ExecState state;
- int retries = 0; // non-zero when retrying our last document.
- while (retries
- || (PlanExecutor::ADVANCED == (state = exec->getNextSnapshotted(&objToIndex,
- &loc)))) {
- try {
- if (_allowInterruption)
- _txn->checkForInterrupt();
-
- // Make sure we are working with the latest version of the document.
- if (objToIndex.snapshotId() != _txn->recoveryUnit()->getSnapshotId()
- && !_collection->findDoc(_txn, loc, &objToIndex)) {
- // doc was deleted so don't index it.
- retries = 0;
- continue;
- }
-
- // Done before insert so we can retry document if it WCEs.
- progress->setTotalWhileRunning( _collection->numRecords(_txn) );
-
- WriteUnitOfWork wunit(_txn);
- Status ret = insert(objToIndex.value(), loc);
- if (ret.isOK()) {
- wunit.commit();
- }
- else if (dupsOut && ret.code() == ErrorCodes::DuplicateKey) {
- // If dupsOut is non-null, we should only fail the specific insert that
- // led to a DuplicateKey rather than the whole index build.
- dupsOut->insert(loc);
- }
- else {
- // Fail the index build hard.
- return ret;
- }
-
- // Go to the next document
- progress->hit();
- n++;
- retries = 0;
- }
- catch (const WriteConflictException& wce) {
- CurOp::get(_txn)->debug().writeConflicts++;
- retries++; // logAndBackoff expects this to be 1 on first call.
- wce.logAndBackoff(retries, "index creation", _collection->ns().ns());
-
- // Can't use WRITE_CONFLICT_RETRY_LOOP macros since we need to save/restore exec
- // around call to abandonSnapshot.
- exec->saveState();
- _txn->recoveryUnit()->abandonSnapshot();
- exec->restoreState(_txn); // Handles any WCEs internally.
- }
- }
+ if (_buildInBackground)
+ _backgroundOperation.reset(new BackgroundOperation(ns));
- if (state != PlanExecutor::IS_EOF) {
- // If the plan executor was killed, this means the DB/collection was dropped and so it
- // is not safe to cleanup the in-progress indexes.
- if (state == PlanExecutor::DEAD) {
- abortWithoutCleanup();
- }
+ wunit.commit();
+ return Status::OK();
+}
- uasserted(28550,
- "Unable to complete index build as the collection is no longer readable");
- }
-
- progress->finished();
+Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsOut) {
+ const char* curopMessage = _buildInBackground ? "Index Build (background)" : "Index Build";
+ const auto numRecords = _collection->numRecords(_txn);
+ stdx::unique_lock<Client> lk(*_txn->getClient());
+ ProgressMeterHolder progress(*_txn->setMessage_inlock(curopMessage, curopMessage, numRecords));
+ lk.unlock();
- Status ret = doneInserting(dupsOut);
- if (!ret.isOK())
- return ret;
+ Timer t;
- log() << "build index done. scanned " << n << " total records. "
- << t.seconds() << " secs" << endl;
+ unsigned long long n = 0;
- return Status::OK();
+ unique_ptr<PlanExecutor> exec(
+ InternalPlanner::collectionScan(_txn, _collection->ns().ns(), _collection));
+ if (_buildInBackground) {
+ invariant(_allowInterruption);
+ exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ } else {
+ exec->setYieldPolicy(PlanExecutor::WRITE_CONFLICT_RETRY_ONLY);
}
- Status MultiIndexBlock::insert(const BSONObj& doc, const RecordId& loc) {
- for ( size_t i = 0; i < _indexes.size(); i++ ) {
-
- if ( _indexes[i].filterExpression &&
- !_indexes[i].filterExpression->matchesBSON(doc) ) {
+ Snapshotted<BSONObj> objToIndex;
+ RecordId loc;
+ PlanExecutor::ExecState state;
+ int retries = 0; // non-zero when retrying our last document.
+ while (retries ||
+ (PlanExecutor::ADVANCED == (state = exec->getNextSnapshotted(&objToIndex, &loc)))) {
+ try {
+ if (_allowInterruption)
+ _txn->checkForInterrupt();
+
+ // Make sure we are working with the latest version of the document.
+ if (objToIndex.snapshotId() != _txn->recoveryUnit()->getSnapshotId() &&
+ !_collection->findDoc(_txn, loc, &objToIndex)) {
+ // doc was deleted so don't index it.
+ retries = 0;
continue;
}
- int64_t unused;
- Status idxStatus(ErrorCodes::InternalError, "");
- if (_indexes[i].bulk) {
- idxStatus = _indexes[i].bulk->insert(_txn, doc, loc, _indexes[i].options, &unused);
- }
- else {
- idxStatus = _indexes[i].real->insert(_txn, doc, loc, _indexes[i].options, &unused);
+ // Done before insert so we can retry document if it WCEs.
+ progress->setTotalWhileRunning(_collection->numRecords(_txn));
+
+ WriteUnitOfWork wunit(_txn);
+ Status ret = insert(objToIndex.value(), loc);
+ if (ret.isOK()) {
+ wunit.commit();
+ } else if (dupsOut && ret.code() == ErrorCodes::DuplicateKey) {
+ // If dupsOut is non-null, we should only fail the specific insert that
+ // led to a DuplicateKey rather than the whole index build.
+ dupsOut->insert(loc);
+ } else {
+ // Fail the index build hard.
+ return ret;
}
- if ( !idxStatus.isOK() )
- return idxStatus;
+ // Go to the next document
+ progress->hit();
+ n++;
+ retries = 0;
+ } catch (const WriteConflictException& wce) {
+ CurOp::get(_txn)->debug().writeConflicts++;
+ retries++; // logAndBackoff expects this to be 1 on first call.
+ wce.logAndBackoff(retries, "index creation", _collection->ns().ns());
+
+ // Can't use WRITE_CONFLICT_RETRY_LOOP macros since we need to save/restore exec
+ // around call to abandonSnapshot.
+ exec->saveState();
+ _txn->recoveryUnit()->abandonSnapshot();
+ exec->restoreState(_txn); // Handles any WCEs internally.
}
- return Status::OK();
}
- Status MultiIndexBlock::doneInserting(std::set<RecordId>* dupsOut) {
- for ( size_t i = 0; i < _indexes.size(); i++ ) {
- if ( _indexes[i].bulk == NULL )
- continue;
- LOG(1) << "\t bulk commit starting for index: "
- << _indexes[i].block->getEntry()->descriptor()->indexName();
- Status status = _indexes[i].real->commitBulk( _txn,
- std::move(_indexes[i].bulk),
- _allowInterruption,
- _indexes[i].options.dupsAllowed,
- dupsOut );
- if ( !status.isOK() ) {
- return status;
- }
+ if (state != PlanExecutor::IS_EOF) {
+ // If the plan executor was killed, this means the DB/collection was dropped and so it
+ // is not safe to cleanup the in-progress indexes.
+ if (state == PlanExecutor::DEAD) {
+ abortWithoutCleanup();
}
- return Status::OK();
+ uasserted(28550, "Unable to complete index build as the collection is no longer readable");
}
- void MultiIndexBlock::abortWithoutCleanup() {
- _indexes.clear();
- _needToCleanup = false;
- }
+ progress->finished();
- void MultiIndexBlock::commit() {
- for ( size_t i = 0; i < _indexes.size(); i++ ) {
- _indexes[i].block->success();
+ Status ret = doneInserting(dupsOut);
+ if (!ret.isOK())
+ return ret;
+
+ log() << "build index done. scanned " << n << " total records. " << t.seconds() << " secs"
+ << endl;
+
+ return Status::OK();
+}
+
+Status MultiIndexBlock::insert(const BSONObj& doc, const RecordId& loc) {
+ for (size_t i = 0; i < _indexes.size(); i++) {
+ if (_indexes[i].filterExpression && !_indexes[i].filterExpression->matchesBSON(doc)) {
+ continue;
+ }
+
+ int64_t unused;
+ Status idxStatus(ErrorCodes::InternalError, "");
+ if (_indexes[i].bulk) {
+ idxStatus = _indexes[i].bulk->insert(_txn, doc, loc, _indexes[i].options, &unused);
+ } else {
+ idxStatus = _indexes[i].real->insert(_txn, doc, loc, _indexes[i].options, &unused);
+ }
+
+ if (!idxStatus.isOK())
+ return idxStatus;
+ }
+ return Status::OK();
+}
+
+Status MultiIndexBlock::doneInserting(std::set<RecordId>* dupsOut) {
+ for (size_t i = 0; i < _indexes.size(); i++) {
+ if (_indexes[i].bulk == NULL)
+ continue;
+ LOG(1) << "\t bulk commit starting for index: "
+ << _indexes[i].block->getEntry()->descriptor()->indexName();
+ Status status = _indexes[i].real->commitBulk(_txn,
+ std::move(_indexes[i].bulk),
+ _allowInterruption,
+ _indexes[i].options.dupsAllowed,
+ dupsOut);
+ if (!status.isOK()) {
+ return status;
}
+ }
+
+ return Status::OK();
+}
- // this one is so operations examining the list of indexes know that the index is finished
- _collection->infoCache()->addedIndex(_txn);
+void MultiIndexBlock::abortWithoutCleanup() {
+ _indexes.clear();
+ _needToCleanup = false;
+}
- _txn->recoveryUnit()->registerChange(new SetNeedToCleanupOnRollback(this));
- _needToCleanup = false;
+void MultiIndexBlock::commit() {
+ for (size_t i = 0; i < _indexes.size(); i++) {
+ _indexes[i].block->success();
}
-} // namespace mongo
+ // this one is so operations examining the list of indexes know that the index is finished
+ _collection->infoCache()->addedIndex(_txn);
+
+ _txn->recoveryUnit()->registerChange(new SetNeedToCleanupOnRollback(this));
+ _needToCleanup = false;
+}
+} // namespace mongo