diff options
Diffstat (limited to 'src/mongo/db/catalog/index_create.cpp')
-rw-r--r-- | src/mongo/db/catalog/index_create.cpp | 540 |
1 files changed, 262 insertions, 278 deletions
diff --git a/src/mongo/db/catalog/index_create.cpp b/src/mongo/db/catalog/index_create.cpp index b624c0151f0..c642fcb83a5 100644 --- a/src/mongo/db/catalog/index_create.cpp +++ b/src/mongo/db/catalog/index_create.cpp @@ -54,334 +54,318 @@ namespace mongo { - using std::unique_ptr; - using std::string; - using std::endl; - - /** - * On rollback sets MultiIndexBlock::_needToCleanup to true. - */ - class MultiIndexBlock::SetNeedToCleanupOnRollback : public RecoveryUnit::Change { - public: - explicit SetNeedToCleanupOnRollback(MultiIndexBlock* indexer) : _indexer(indexer) {} - - virtual void commit() {} - virtual void rollback() { _indexer->_needToCleanup = true; } - - private: - MultiIndexBlock* const _indexer; - }; - - /** - * On rollback in init(), cleans up _indexes so that ~MultiIndexBlock doesn't try to clean - * up _indexes manually (since the changes were already rolled back). - * Due to this, it is thus legal to call init() again after it fails. - */ - class MultiIndexBlock::CleanupIndexesVectorOnRollback : public RecoveryUnit::Change { - public: - explicit CleanupIndexesVectorOnRollback(MultiIndexBlock* indexer) : _indexer(indexer) {} - - virtual void commit() {} - virtual void rollback() { _indexer->_indexes.clear(); } - - private: - MultiIndexBlock* const _indexer; - }; - - MultiIndexBlock::MultiIndexBlock(OperationContext* txn, Collection* collection) - : _collection(collection), - _txn(txn), - _buildInBackground(false), - _allowInterruption(false), - _ignoreUnique(false), - _needToCleanup(true) { +using std::unique_ptr; +using std::string; +using std::endl; + +/** + * On rollback sets MultiIndexBlock::_needToCleanup to true. + */ +class MultiIndexBlock::SetNeedToCleanupOnRollback : public RecoveryUnit::Change { +public: + explicit SetNeedToCleanupOnRollback(MultiIndexBlock* indexer) : _indexer(indexer) {} + + virtual void commit() {} + virtual void rollback() { + _indexer->_needToCleanup = true; } - MultiIndexBlock::~MultiIndexBlock() { - if (!_needToCleanup || _indexes.empty()) - return; - while (true) { - try { - WriteUnitOfWork wunit(_txn); - // This cleans up all index builds. - // Because that may need to write, it is done inside - // of a WUOW. Nothing inside this block can fail, and it is made fatal if it does. - for (size_t i = 0; i < _indexes.size(); i++) { - _indexes[i].block->fail(); - } - wunit.commit(); - return; - } - catch (const WriteConflictException& e) { - continue; - } - catch (const std::exception& e) { - error() << "Caught exception while cleaning up partially built indexes: " - << e.what(); - } - catch (...) { - error() << "Caught unknown exception while cleaning up partially built indexes."; - } - fassertFailed(18644); - } +private: + MultiIndexBlock* const _indexer; +}; + +/** + * On rollback in init(), cleans up _indexes so that ~MultiIndexBlock doesn't try to clean + * up _indexes manually (since the changes were already rolled back). + * Due to this, it is thus legal to call init() again after it fails. + */ +class MultiIndexBlock::CleanupIndexesVectorOnRollback : public RecoveryUnit::Change { +public: + explicit CleanupIndexesVectorOnRollback(MultiIndexBlock* indexer) : _indexer(indexer) {} + + virtual void commit() {} + virtual void rollback() { + _indexer->_indexes.clear(); } - void MultiIndexBlock::removeExistingIndexes(std::vector<BSONObj>* specs) const { - for (size_t i = 0; i < specs->size(); i++) { - Status status = - _collection->getIndexCatalog()->prepareSpecForCreate(_txn, (*specs)[i]).getStatus(); - if (status.code() == ErrorCodes::IndexAlreadyExists) { - specs->erase(specs->begin() + i); - i--; +private: + MultiIndexBlock* const _indexer; +}; + +MultiIndexBlock::MultiIndexBlock(OperationContext* txn, Collection* collection) + : _collection(collection), + _txn(txn), + _buildInBackground(false), + _allowInterruption(false), + _ignoreUnique(false), + _needToCleanup(true) {} + +MultiIndexBlock::~MultiIndexBlock() { + if (!_needToCleanup || _indexes.empty()) + return; + while (true) { + try { + WriteUnitOfWork wunit(_txn); + // This cleans up all index builds. + // Because that may need to write, it is done inside + // of a WUOW. Nothing inside this block can fail, and it is made fatal if it does. + for (size_t i = 0; i < _indexes.size(); i++) { + _indexes[i].block->fail(); } - // intentionally ignoring other error codes + wunit.commit(); + return; + } catch (const WriteConflictException& e) { + continue; + } catch (const std::exception& e) { + error() << "Caught exception while cleaning up partially built indexes: " << e.what(); + } catch (...) { + error() << "Caught unknown exception while cleaning up partially built indexes."; } + fassertFailed(18644); } +} + +void MultiIndexBlock::removeExistingIndexes(std::vector<BSONObj>* specs) const { + for (size_t i = 0; i < specs->size(); i++) { + Status status = + _collection->getIndexCatalog()->prepareSpecForCreate(_txn, (*specs)[i]).getStatus(); + if (status.code() == ErrorCodes::IndexAlreadyExists) { + specs->erase(specs->begin() + i); + i--; + } + // intentionally ignoring other error codes + } +} - Status MultiIndexBlock::init(const std::vector<BSONObj>& indexSpecs) { - WriteUnitOfWork wunit(_txn); - - invariant(_indexes.empty()); - _txn->recoveryUnit()->registerChange(new CleanupIndexesVectorOnRollback(this)); +Status MultiIndexBlock::init(const std::vector<BSONObj>& indexSpecs) { + WriteUnitOfWork wunit(_txn); - const string& ns = _collection->ns().ns(); + invariant(_indexes.empty()); + _txn->recoveryUnit()->registerChange(new CleanupIndexesVectorOnRollback(this)); - Status status = _collection->getIndexCatalog()->checkUnfinished(); - if ( !status.isOK() ) - return status; + const string& ns = _collection->ns().ns(); - for ( size_t i = 0; i < indexSpecs.size(); i++ ) { - BSONObj info = indexSpecs[i]; + Status status = _collection->getIndexCatalog()->checkUnfinished(); + if (!status.isOK()) + return status; - string pluginName = IndexNames::findPluginName( info["key"].Obj() ); - if ( pluginName.size() ) { - Status s = - _collection->getIndexCatalog()->_upgradeDatabaseMinorVersionIfNeeded(_txn, pluginName); - if ( !s.isOK() ) - return s; - } + for (size_t i = 0; i < indexSpecs.size(); i++) { + BSONObj info = indexSpecs[i]; - // Any foreground indexes make all indexes be built in the foreground. - _buildInBackground = (_buildInBackground && info["background"].trueValue()); + string pluginName = IndexNames::findPluginName(info["key"].Obj()); + if (pluginName.size()) { + Status s = _collection->getIndexCatalog()->_upgradeDatabaseMinorVersionIfNeeded( + _txn, pluginName); + if (!s.isOK()) + return s; } - for ( size_t i = 0; i < indexSpecs.size(); i++ ) { - BSONObj info = indexSpecs[i]; - StatusWith<BSONObj> statusWithInfo = - _collection->getIndexCatalog()->prepareSpecForCreate( _txn, info ); - Status status = statusWithInfo.getStatus(); - if ( !status.isOK() ) - return status; - info = statusWithInfo.getValue(); - - IndexToBuild index; - index.block.reset(new IndexCatalog::IndexBuildBlock(_txn, _collection, info)); - status = index.block->init(); - if ( !status.isOK() ) - return status; - - index.real = index.block->getEntry()->accessMethod(); - status = index.real->initializeAsEmpty(_txn); - if ( !status.isOK() ) - return status; - - if (!_buildInBackground) { - // Bulk build process requires foreground building as it assumes nothing is changing - // under it. - index.bulk = index.real->initiateBulk(); - } + // Any foreground indexes make all indexes be built in the foreground. + _buildInBackground = (_buildInBackground && info["background"].trueValue()); + } + + for (size_t i = 0; i < indexSpecs.size(); i++) { + BSONObj info = indexSpecs[i]; + StatusWith<BSONObj> statusWithInfo = + _collection->getIndexCatalog()->prepareSpecForCreate(_txn, info); + Status status = statusWithInfo.getStatus(); + if (!status.isOK()) + return status; + info = statusWithInfo.getValue(); - const IndexDescriptor* descriptor = index.block->getEntry()->descriptor(); + IndexToBuild index; + index.block.reset(new IndexCatalog::IndexBuildBlock(_txn, _collection, info)); + status = index.block->init(); + if (!status.isOK()) + return status; - index.options.logIfError = false; // logging happens elsewhere if needed. - index.options.dupsAllowed = !descriptor->unique() - || _ignoreUnique - || repl::getGlobalReplicationCoordinator() - ->shouldIgnoreUniqueIndex(descriptor); + index.real = index.block->getEntry()->accessMethod(); + status = index.real->initializeAsEmpty(_txn); + if (!status.isOK()) + return status; - log() << "build index on: " << ns << " properties: " << descriptor->toString(); - if (index.bulk) - log() << "\t building index using bulk method"; + if (!_buildInBackground) { + // Bulk build process requires foreground building as it assumes nothing is changing + // under it. + index.bulk = index.real->initiateBulk(); + } - index.filterExpression = index.block->getEntry()->getFilterExpression(); + const IndexDescriptor* descriptor = index.block->getEntry()->descriptor(); - // TODO SERVER-14888 Suppress this in cases we don't want to audit. - audit::logCreateIndex(_txn->getClient(), &info, descriptor->indexName(), ns); + index.options.logIfError = false; // logging happens elsewhere if needed. + index.options.dupsAllowed = !descriptor->unique() || _ignoreUnique || + repl::getGlobalReplicationCoordinator()->shouldIgnoreUniqueIndex(descriptor); - _indexes.push_back(std::move(index)); - } + log() << "build index on: " << ns << " properties: " << descriptor->toString(); + if (index.bulk) + log() << "\t building index using bulk method"; - // this is so that operations examining the list of indexes know there are more keys to look - // at when doing things like in place updates, etc... - _collection->infoCache()->addedIndex(_txn); + index.filterExpression = index.block->getEntry()->getFilterExpression(); - if (_buildInBackground) - _backgroundOperation.reset(new BackgroundOperation(ns)); + // TODO SERVER-14888 Suppress this in cases we don't want to audit. + audit::logCreateIndex(_txn->getClient(), &info, descriptor->indexName(), ns); - wunit.commit(); - return Status::OK(); + _indexes.push_back(std::move(index)); } - Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsOut) { - const char* curopMessage = _buildInBackground ? "Index Build (background)" : "Index Build"; - const auto numRecords = _collection->numRecords(_txn); - stdx::unique_lock<Client> lk(*_txn->getClient()); - ProgressMeterHolder progress(*_txn->setMessage_inlock(curopMessage, - curopMessage, - numRecords)); - lk.unlock(); - - Timer t; - - unsigned long long n = 0; - - unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(_txn, - _collection->ns().ns(), - _collection)); - if (_buildInBackground) { - invariant(_allowInterruption); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); - } - else { - exec->setYieldPolicy(PlanExecutor::WRITE_CONFLICT_RETRY_ONLY); - } + // this is so that operations examining the list of indexes know there are more keys to look + // at when doing things like in place updates, etc... + _collection->infoCache()->addedIndex(_txn); - Snapshotted<BSONObj> objToIndex; - RecordId loc; - PlanExecutor::ExecState state; - int retries = 0; // non-zero when retrying our last document. - while (retries - || (PlanExecutor::ADVANCED == (state = exec->getNextSnapshotted(&objToIndex, - &loc)))) { - try { - if (_allowInterruption) - _txn->checkForInterrupt(); - - // Make sure we are working with the latest version of the document. - if (objToIndex.snapshotId() != _txn->recoveryUnit()->getSnapshotId() - && !_collection->findDoc(_txn, loc, &objToIndex)) { - // doc was deleted so don't index it. - retries = 0; - continue; - } - - // Done before insert so we can retry document if it WCEs. - progress->setTotalWhileRunning( _collection->numRecords(_txn) ); - - WriteUnitOfWork wunit(_txn); - Status ret = insert(objToIndex.value(), loc); - if (ret.isOK()) { - wunit.commit(); - } - else if (dupsOut && ret.code() == ErrorCodes::DuplicateKey) { - // If dupsOut is non-null, we should only fail the specific insert that - // led to a DuplicateKey rather than the whole index build. - dupsOut->insert(loc); - } - else { - // Fail the index build hard. - return ret; - } - - // Go to the next document - progress->hit(); - n++; - retries = 0; - } - catch (const WriteConflictException& wce) { - CurOp::get(_txn)->debug().writeConflicts++; - retries++; // logAndBackoff expects this to be 1 on first call. - wce.logAndBackoff(retries, "index creation", _collection->ns().ns()); - - // Can't use WRITE_CONFLICT_RETRY_LOOP macros since we need to save/restore exec - // around call to abandonSnapshot. - exec->saveState(); - _txn->recoveryUnit()->abandonSnapshot(); - exec->restoreState(_txn); // Handles any WCEs internally. - } - } + if (_buildInBackground) + _backgroundOperation.reset(new BackgroundOperation(ns)); - if (state != PlanExecutor::IS_EOF) { - // If the plan executor was killed, this means the DB/collection was dropped and so it - // is not safe to cleanup the in-progress indexes. - if (state == PlanExecutor::DEAD) { - abortWithoutCleanup(); - } + wunit.commit(); + return Status::OK(); +} - uasserted(28550, - "Unable to complete index build as the collection is no longer readable"); - } - - progress->finished(); +Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsOut) { + const char* curopMessage = _buildInBackground ? "Index Build (background)" : "Index Build"; + const auto numRecords = _collection->numRecords(_txn); + stdx::unique_lock<Client> lk(*_txn->getClient()); + ProgressMeterHolder progress(*_txn->setMessage_inlock(curopMessage, curopMessage, numRecords)); + lk.unlock(); - Status ret = doneInserting(dupsOut); - if (!ret.isOK()) - return ret; + Timer t; - log() << "build index done. scanned " << n << " total records. " - << t.seconds() << " secs" << endl; + unsigned long long n = 0; - return Status::OK(); + unique_ptr<PlanExecutor> exec( + InternalPlanner::collectionScan(_txn, _collection->ns().ns(), _collection)); + if (_buildInBackground) { + invariant(_allowInterruption); + exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + } else { + exec->setYieldPolicy(PlanExecutor::WRITE_CONFLICT_RETRY_ONLY); } - Status MultiIndexBlock::insert(const BSONObj& doc, const RecordId& loc) { - for ( size_t i = 0; i < _indexes.size(); i++ ) { - - if ( _indexes[i].filterExpression && - !_indexes[i].filterExpression->matchesBSON(doc) ) { + Snapshotted<BSONObj> objToIndex; + RecordId loc; + PlanExecutor::ExecState state; + int retries = 0; // non-zero when retrying our last document. + while (retries || + (PlanExecutor::ADVANCED == (state = exec->getNextSnapshotted(&objToIndex, &loc)))) { + try { + if (_allowInterruption) + _txn->checkForInterrupt(); + + // Make sure we are working with the latest version of the document. + if (objToIndex.snapshotId() != _txn->recoveryUnit()->getSnapshotId() && + !_collection->findDoc(_txn, loc, &objToIndex)) { + // doc was deleted so don't index it. + retries = 0; continue; } - int64_t unused; - Status idxStatus(ErrorCodes::InternalError, ""); - if (_indexes[i].bulk) { - idxStatus = _indexes[i].bulk->insert(_txn, doc, loc, _indexes[i].options, &unused); - } - else { - idxStatus = _indexes[i].real->insert(_txn, doc, loc, _indexes[i].options, &unused); + // Done before insert so we can retry document if it WCEs. + progress->setTotalWhileRunning(_collection->numRecords(_txn)); + + WriteUnitOfWork wunit(_txn); + Status ret = insert(objToIndex.value(), loc); + if (ret.isOK()) { + wunit.commit(); + } else if (dupsOut && ret.code() == ErrorCodes::DuplicateKey) { + // If dupsOut is non-null, we should only fail the specific insert that + // led to a DuplicateKey rather than the whole index build. + dupsOut->insert(loc); + } else { + // Fail the index build hard. + return ret; } - if ( !idxStatus.isOK() ) - return idxStatus; + // Go to the next document + progress->hit(); + n++; + retries = 0; + } catch (const WriteConflictException& wce) { + CurOp::get(_txn)->debug().writeConflicts++; + retries++; // logAndBackoff expects this to be 1 on first call. + wce.logAndBackoff(retries, "index creation", _collection->ns().ns()); + + // Can't use WRITE_CONFLICT_RETRY_LOOP macros since we need to save/restore exec + // around call to abandonSnapshot. + exec->saveState(); + _txn->recoveryUnit()->abandonSnapshot(); + exec->restoreState(_txn); // Handles any WCEs internally. } - return Status::OK(); } - Status MultiIndexBlock::doneInserting(std::set<RecordId>* dupsOut) { - for ( size_t i = 0; i < _indexes.size(); i++ ) { - if ( _indexes[i].bulk == NULL ) - continue; - LOG(1) << "\t bulk commit starting for index: " - << _indexes[i].block->getEntry()->descriptor()->indexName(); - Status status = _indexes[i].real->commitBulk( _txn, - std::move(_indexes[i].bulk), - _allowInterruption, - _indexes[i].options.dupsAllowed, - dupsOut ); - if ( !status.isOK() ) { - return status; - } + if (state != PlanExecutor::IS_EOF) { + // If the plan executor was killed, this means the DB/collection was dropped and so it + // is not safe to cleanup the in-progress indexes. + if (state == PlanExecutor::DEAD) { + abortWithoutCleanup(); } - return Status::OK(); + uasserted(28550, "Unable to complete index build as the collection is no longer readable"); } - void MultiIndexBlock::abortWithoutCleanup() { - _indexes.clear(); - _needToCleanup = false; - } + progress->finished(); - void MultiIndexBlock::commit() { - for ( size_t i = 0; i < _indexes.size(); i++ ) { - _indexes[i].block->success(); + Status ret = doneInserting(dupsOut); + if (!ret.isOK()) + return ret; + + log() << "build index done. scanned " << n << " total records. " << t.seconds() << " secs" + << endl; + + return Status::OK(); +} + +Status MultiIndexBlock::insert(const BSONObj& doc, const RecordId& loc) { + for (size_t i = 0; i < _indexes.size(); i++) { + if (_indexes[i].filterExpression && !_indexes[i].filterExpression->matchesBSON(doc)) { + continue; + } + + int64_t unused; + Status idxStatus(ErrorCodes::InternalError, ""); + if (_indexes[i].bulk) { + idxStatus = _indexes[i].bulk->insert(_txn, doc, loc, _indexes[i].options, &unused); + } else { + idxStatus = _indexes[i].real->insert(_txn, doc, loc, _indexes[i].options, &unused); + } + + if (!idxStatus.isOK()) + return idxStatus; + } + return Status::OK(); +} + +Status MultiIndexBlock::doneInserting(std::set<RecordId>* dupsOut) { + for (size_t i = 0; i < _indexes.size(); i++) { + if (_indexes[i].bulk == NULL) + continue; + LOG(1) << "\t bulk commit starting for index: " + << _indexes[i].block->getEntry()->descriptor()->indexName(); + Status status = _indexes[i].real->commitBulk(_txn, + std::move(_indexes[i].bulk), + _allowInterruption, + _indexes[i].options.dupsAllowed, + dupsOut); + if (!status.isOK()) { + return status; } + } + + return Status::OK(); +} - // this one is so operations examining the list of indexes know that the index is finished - _collection->infoCache()->addedIndex(_txn); +void MultiIndexBlock::abortWithoutCleanup() { + _indexes.clear(); + _needToCleanup = false; +} - _txn->recoveryUnit()->registerChange(new SetNeedToCleanupOnRollback(this)); - _needToCleanup = false; +void MultiIndexBlock::commit() { + for (size_t i = 0; i < _indexes.size(); i++) { + _indexes[i].block->success(); } -} // namespace mongo + // this one is so operations examining the list of indexes know that the index is finished + _collection->infoCache()->addedIndex(_txn); + + _txn->recoveryUnit()->registerChange(new SetNeedToCleanupOnRollback(this)); + _needToCleanup = false; +} +} // namespace mongo |