diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2020-06-17 17:31:22 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-06-17 21:44:33 +0000 |
commit | cbe0625a55256a3a0023223ae3e1fc6494e721af (patch) | |
tree | 7451f0913d11eb4d827db634c9a84fef951de919 /src | |
parent | 728a6e9d5d70885314e1e54619b6caffd1498999 (diff) | |
download | mongo-cbe0625a55256a3a0023223ae3e1fc6494e721af.tar.gz |
SERVER-48416 Write index build progress to internal table on clean shutdown
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/catalog/multi_index_block.cpp | 115 | ||||
-rw-r--r-- | src/mongo/db/catalog/multi_index_block.h | 15 | ||||
-rw-r--r-- | src/mongo/db/index/index_access_method.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/index/index_access_method.h | 16 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter.h | 36 | ||||
-rw-r--r-- | src/mongo/db/sorter/sorter_test.cpp | 7 |
7 files changed, 174 insertions, 79 deletions
diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp index 58a696e9f8f..d31ecdc07e6 100644 --- a/src/mongo/db/catalog/multi_index_block.cpp +++ b/src/mongo/db/catalog/multi_index_block.cpp @@ -502,6 +502,9 @@ Status MultiIndexBlock::insert(OperationContext* opCtx, const BSONObj& doc, cons if (!idxStatus.isOK()) return idxStatus; } + + _lastRecordIdInserted = loc; + return Status::OK(); } @@ -513,6 +516,10 @@ Status MultiIndexBlock::dumpInsertsFromBulk(OperationContext* opCtx, std::set<RecordId>* dupRecords) { invariant(!_buildIsCleanedUp); invariant(opCtx->lockState()->isNoop() || !opCtx->lockState()->inAWriteUnitOfWork()); + + invariant(_phase == Phase::kCollectionScan, _phaseToString(_phase)); + _phase = Phase::kBulkLoad; + for (size_t i = 0; i < _indexes.size(); i++) { // If 'dupRecords' is provided, it will be used to store all records that would result in // duplicate key errors. Only pass 'dupKeysInserted', which stores inserted duplicate keys, @@ -572,6 +579,13 @@ Status MultiIndexBlock::drainBackgroundWrites( IndexBuildInterceptor::DrainYieldPolicy drainYieldPolicy) { invariant(!_buildIsCleanedUp); invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + + // Background writes are drained three times (once without blocking writes and twice blocking + // writes), so we may either be coming from the bulk load phase or be already in the drain + // writes phase. + invariant(_phase == Phase::kBulkLoad || _phase == Phase::kDrainWrites, _phaseToString(_phase)); + _phase = Phase::kDrainWrites; + const Collection* coll = CollectionCatalog::get(opCtx).lookupCollectionByUUID(opCtx, _collectionUUID.get()); @@ -724,8 +738,7 @@ void MultiIndexBlock::_abortWithoutCleanup(OperationContext* opCtx, bool shutdow auto action = TemporaryRecordStore::FinalizationAction::kDelete; - if (shutdown && _buildUUID && _method == IndexBuildMethod::kHybrid && - opCtx->getServiceContext()->getStorageEngine()->supportsResumableIndexBuilds()) { + if (_shouldWriteStateToDisk(opCtx, shutdown)) { _writeStateToDisk(opCtx); action = TemporaryRecordStore::FinalizationAction::kKeep; } @@ -737,20 +750,70 @@ void MultiIndexBlock::_abortWithoutCleanup(OperationContext* opCtx, bool shutdow _buildIsCleanedUp = true; } +bool MultiIndexBlock::_shouldWriteStateToDisk(OperationContext* opCtx, bool shutdown) const { + return shutdown && _buildUUID && !_buildIsCleanedUp && _method == IndexBuildMethod::kHybrid && + opCtx->getServiceContext()->getStorageEngine()->supportsResumableIndexBuilds(); +} + void MultiIndexBlock::_writeStateToDisk(OperationContext* opCtx) const { + auto obj = _constructStateObject(); + auto rs = opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx); + + WriteUnitOfWork wuow(opCtx); + + auto status = rs->rs()->insertRecord(opCtx, obj.objdata(), obj.objsize(), Timestamp()); + if (!status.isOK()) { + LOGV2_ERROR(4841501, + "Failed to write resumable index build state to disk", + logAttrs(*_buildUUID), + "error"_attr = status.getStatus()); + dassert(status, + str::stream() << "Failed to write resumable index build state to disk. UUID: " + << *_buildUUID); + + rs->finalizeTemporaryTable(opCtx, TemporaryRecordStore::FinalizationAction::kDelete); + return; + } + + wuow.commit(); + + LOGV2(4841502, "Wrote resumable index build state to disk", logAttrs(*_buildUUID)); + + rs->finalizeTemporaryTable(opCtx, TemporaryRecordStore::FinalizationAction::kKeep); +} + +BSONObj MultiIndexBlock::_constructStateObject() const { BSONObjBuilder builder; _buildUUID->appendToBuilder(&builder, "_id"); - builder.append("phase", "unknown"); - builder.appendNumber("collectionScanPosition", 0LL); + builder.append("phase", _phaseToString(_phase)); + + // We can be interrupted by shutdown before inserting the first document from the collection + // scan, in which case there is no _lastRecordIdInserted. + if (_phase == Phase::kCollectionScan && _lastRecordIdInserted) + builder.append("collectionScanPosition", _lastRecordIdInserted->repr()); BSONArrayBuilder indexesArray(builder.subarrayStart("indexes")); for (const auto& index : _indexes) { BSONObjBuilder indexInfo(indexesArray.subobjStart()); - indexInfo.append("tempDir", ""); - indexInfo.append("fileName", ""); - BSONArrayBuilder ranges(indexInfo.subarrayStart("ranges")); - ranges.done(); + if (_phase == Phase::kCollectionScan || _phase == Phase::kBulkLoad) { + auto state = index.bulk->getSorterState(); + + indexInfo.append("tempDir", state.tempDir); + indexInfo.append("fileName", state.fileName); + + BSONArrayBuilder ranges(indexInfo.subarrayStart("ranges")); + for (const auto& rangeInfo : state.ranges) { + BSONObjBuilder range(ranges.subobjStart()); + + range.append("startOffset", rangeInfo.startOffset); + range.append("endOffset", rangeInfo.endOffset); + range.appendNumber("checksum", static_cast<long long>(rangeInfo.checksum)); + + range.done(); + } + ranges.done(); + } auto indexBuildInterceptor = index.block->getEntry()->indexBuildInterceptor(); indexInfo.append("sideWritesTable", indexBuildInterceptor->getSideWritesTableIdent()); @@ -764,33 +827,25 @@ void MultiIndexBlock::_writeStateToDisk(OperationContext* opCtx) const { indexInfo.append("skippedRecordTrackerTable", *skippedRecordTrackerTableIdent); indexInfo.done(); + + // Ensure the data we are referencing has been persisted to disk. + index.bulk->persistDataForShutdown(); } indexesArray.done(); - auto obj = builder.done(); - auto rs = opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx); - - WriteUnitOfWork wuow(opCtx); - - auto status = rs->rs()->insertRecord(opCtx, obj.objdata(), obj.objsize(), Timestamp()); - if (!status.isOK()) { - LOGV2_ERROR(4841501, - "Failed to write resumable index build state to disk", - logAttrs(*_buildUUID), - "error"_attr = status.getStatus()); - dassert(status, - str::stream() << "Failed to write resumable index build state to disk. UUID: " - << *_buildUUID); + return builder.obj(); +} - rs->finalizeTemporaryTable(opCtx, TemporaryRecordStore::FinalizationAction::kDelete); - return; +std::string MultiIndexBlock::_phaseToString(Phase phase) const { + switch (phase) { + case Phase::kCollectionScan: + return "collection scan"; + case Phase::kBulkLoad: + return "bulk load"; + case Phase::kDrainWrites: + return "drain writes"; } - - wuow.commit(); - - LOGV2(4841502, "Wrote resumable index build state to disk", logAttrs(*_buildUUID)); - - rs->finalizeTemporaryTable(opCtx, TemporaryRecordStore::FinalizationAction::kKeep); + MONGO_UNREACHABLE; } } // namespace mongo diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h index dd29e02f235..57e4b45a7da 100644 --- a/src/mongo/db/catalog/multi_index_block.h +++ b/src/mongo/db/catalog/multi_index_block.h @@ -295,10 +295,18 @@ private: InsertDeleteOptions options; }; + enum class Phase { kCollectionScan, kBulkLoad, kDrainWrites }; + void _abortWithoutCleanup(OperationContext* opCtx, bool shutdown); + bool _shouldWriteStateToDisk(OperationContext* opCtx, bool shutdown) const; + void _writeStateToDisk(OperationContext* opCtx) const; + BSONObj _constructStateObject() const; + + std::string _phaseToString(Phase phase) const; + // Is set during init() and ensures subsequent function calls act on the same Collection. boost::optional<UUID> _collectionUUID; @@ -315,5 +323,12 @@ private: // A unique identifier associating this index build with a two-phase index build within a // replica set. boost::optional<UUID> _buildUUID; + + // The RecordId corresponding to the object most recently inserted using this MultiIndexBlock, + // or boost::none if nothing has been inserted. + boost::optional<RecordId> _lastRecordIdInserted; + + // The current phase of the index build. + Phase _phase = Phase::kCollectionScan; }; } // namespace mongo diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index a365dc163f8..6f450b57767 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -471,9 +471,13 @@ public: int64_t getKeysInserted() const final; - State getState() const final; + Sorter::State getSorterState() const final; + + void persistDataForShutdown() final; private: + void _addMultikeyMetadataKeysIntoSorter(); + std::unique_ptr<Sorter> _sorter; IndexCatalogEntry* _indexCatalogEntry; int64_t _keysInserted = 0; @@ -489,10 +493,6 @@ private: // These are inserted into the sorter after all normal data keys have been added, just // before the bulk build is committed. KeyStringSet _multikeyMetadataKeys; - - // The RecordId corresponding to the object most recently inserted using this BulkBuilder, or - // boost::none if nothing has been inserted into the sorter or done() has already been called. - boost::optional<RecordId> _lastRecordIdInserted; }; std::unique_ptr<IndexAccessMethod::BulkBuilder> AbstractIndexAccessMethod::initiateBulk( @@ -573,7 +573,6 @@ Status AbstractIndexAccessMethod::BulkBuilderImpl::insert(OperationContext* opCt _isMultiKey = _isMultiKey || _indexCatalogEntry->accessMethod()->shouldMarkIndexAsMultikey( keys->size(), _multikeyMetadataKeys, *multikeyPaths); - _lastRecordIdInserted = loc; return Status::OK(); } @@ -588,11 +587,7 @@ bool AbstractIndexAccessMethod::BulkBuilderImpl::isMultikey() const { IndexAccessMethod::BulkBuilder::Sorter::Iterator* AbstractIndexAccessMethod::BulkBuilderImpl::done() { - for (const auto& keyString : _multikeyMetadataKeys) { - _sorter->add(keyString, mongo::NullValue()); - ++_keysInserted; - } - _lastRecordIdInserted = boost::none; + _addMultikeyMetadataKeysIntoSorter(); return _sorter->done(); } @@ -600,12 +595,21 @@ int64_t AbstractIndexAccessMethod::BulkBuilderImpl::getKeysInserted() const { return _keysInserted; } -AbstractIndexAccessMethod::BulkBuilder::State AbstractIndexAccessMethod::BulkBuilderImpl::getState() - const { - return {_lastRecordIdInserted, - _sorter->getTempDir(), - _sorter->getFileName(), - _sorter->getRangeInfos()}; +AbstractIndexAccessMethod::BulkBuilder::Sorter::State +AbstractIndexAccessMethod::BulkBuilderImpl::getSorterState() const { + return _sorter->getState(); +} + +void AbstractIndexAccessMethod::BulkBuilderImpl::persistDataForShutdown() { + _addMultikeyMetadataKeysIntoSorter(); + _sorter->persistDataForShutdown(); +} + +void AbstractIndexAccessMethod::BulkBuilderImpl::_addMultikeyMetadataKeysIntoSorter() { + for (const auto& keyString : _multikeyMetadataKeys) { + _sorter->add(keyString, mongo::NullValue()); + ++_keysInserted; + } } Status AbstractIndexAccessMethod::commitBulk(OperationContext* opCtx, diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index deb586ddeb6..793a298066a 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -205,13 +205,6 @@ public: public: using Sorter = mongo::Sorter<KeyString::Value, mongo::NullValue>; - struct State { - boost::optional<RecordId> lastRecordIdInserted; - std::string tempDir; - std::string fileName; - std::vector<SorterRangeInfo> ranges; - }; - virtual ~BulkBuilder() = default; /** @@ -238,9 +231,14 @@ public: virtual int64_t getKeysInserted() const = 0; /** - * Returns the current state of this BulkBuilder and its underlying Sorter. + * Returns the current state of this BulkBuilder's underlying Sorter. + */ + virtual Sorter::State getSorterState() const = 0; + + /** + * Persists on disk the keys that have been inserted using this BulkBuilder. */ - virtual State getState() const = 0; + virtual void persistDataForShutdown() = 0; }; /** diff --git a/src/mongo/db/sorter/sorter.cpp b/src/mongo/db/sorter/sorter.cpp index bc427c0a71f..a82915c8302 100644 --- a/src/mongo/db/sorter/sorter.cpp +++ b/src/mongo/db/sorter/sorter.cpp @@ -537,7 +537,7 @@ public: } ~NoLimitSorter() { - if (!_done) { + if (!_done && !this->_shouldKeepFilesOnDestruction) { // If done() was never called to return a MergeIterator, then this Sorter still owns // file deletion. DESTRUCTOR_GUARD(boost::filesystem::remove(this->_fileName)); @@ -669,6 +669,10 @@ public: } private: + void spill() { + invariant(false, "LimitOneSorter does not spill to disk"); + } + const Comparator _comp; Data _best; bool _haveData; // false at start, set to true on first call to add() @@ -711,7 +715,7 @@ public: } ~TopKSorter() { - if (!_done) { + if (!_done && !this->_shouldKeepFilesOnDestruction) { // If done() was never called to return a MergeIterator, then this Sorter still owns // file deletion. DESTRUCTOR_GUARD(boost::filesystem::remove(this->_fileName)); @@ -937,6 +941,24 @@ private: } // namespace sorter +template <typename Key, typename Value> +std::vector<SorterRangeInfo> Sorter<Key, Value>::_getRangeInfos() const { + std::vector<SorterRangeInfo> ranges; + ranges.reserve(_iters.size()); + + std::transform(_iters.begin(), _iters.end(), std::back_inserter(ranges), [](const auto it) { + return it->getRangeInfo(); + }); + + return ranges; +} + +template <typename Key, typename Value> +void Sorter<Key, Value>::persistDataForShutdown() { + spill(); + _shouldKeepFilesOnDestruction = true; +} + // // SortedFileWriter // diff --git a/src/mongo/db/sorter/sorter.h b/src/mongo/db/sorter/sorter.h index 4ee8fb5dbc8..534c6cc4c02 100644 --- a/src/mongo/db/sorter/sorter.h +++ b/src/mongo/db/sorter/sorter.h @@ -153,8 +153,8 @@ public: }; struct SorterRangeInfo { - std::streampos startOffset; - std::streampos endOffset; + std::streamoff startOffset; + std::streamoff endOffset; uint32_t checksum; }; @@ -224,6 +224,12 @@ public: typename Value::SorterDeserializeSettings> Settings; + struct State { + std::string tempDir; + std::string fileName; + std::vector<SorterRangeInfo> ranges; + }; + template <typename Comparator> static Sorter* make(const SortOptions& opts, const Comparator& comp, @@ -244,30 +250,24 @@ public: return _usedDisk; } - std::string getTempDir() const { - return _tempDir; - } - - std::string getFileName() const { - return _fileName; + State getState() const { + return {_tempDir, _fileName, _getRangeInfos()}; } - std::vector<SorterRangeInfo> getRangeInfos() const { - std::vector<SorterRangeInfo> ranges; - ranges.reserve(_iters.size()); - - std::transform(_iters.begin(), _iters.end(), std::back_inserter(ranges), [](const auto it) { - return it->getRangeInfo(); - }); - - return ranges; - } + void persistDataForShutdown(); protected: Sorter() {} // can only be constructed as a base + virtual void spill() = 0; + + std::vector<SorterRangeInfo> _getRangeInfos() const; + bool _usedDisk{false}; // Keeps track of whether the sorter used disk or not + // Whether the files written by this Sorter should be kept on destruction. + bool _shouldKeepFilesOnDestruction = false; + std::string _tempDir; std::string _fileName; diff --git a/src/mongo/db/sorter/sorter_test.cpp b/src/mongo/db/sorter/sorter_test.cpp index f9b56cb88cf..9e307fe2ba1 100644 --- a/src/mongo/db/sorter/sorter_test.cpp +++ b/src/mongo/db/sorter/sorter_test.cpp @@ -480,12 +480,13 @@ private: } void assertRangeInfo(unowned_ptr<IWSorter> sorter, const SortOptions& opts) { + auto state = sorter->getState(); if (opts.extSortAllowed) { - ASSERT_EQ(sorter->getTempDir(), opts.tempDir); - ASSERT_NE(sorter->getFileName(), ""); + ASSERT_EQ(state.tempDir, opts.tempDir); + ASSERT_NE(state.fileName, ""); } if (auto numRanges = correctNumRanges()) { - ASSERT_EQ(sorter->getRangeInfos().size(), *numRanges); + ASSERT_EQ(state.ranges.size(), *numRanges); } } }; |