diff options
author | Louis Williams <louis.williams@mongodb.com> | 2018-11-12 13:35:36 -0500 |
---|---|---|
committer | Louis Williams <louis.williams@mongodb.com> | 2018-11-29 13:09:15 -0500 |
commit | ca1cccb8a18be76c584f587e04b14512e59d8424 (patch) | |
tree | 455c08f9ad231f45fc64e8a3b2a5ec2d4048cc38 /src | |
parent | b5308fc30a1ec7405ccec6dcc4213cf5fb167a4e (diff) | |
download | mongo-ca1cccb8a18be76c584f587e04b14512e59d8424.tar.gz |
SERVER-38027 SERVER-37268 Partially enable hybrid index builds for background, non-unique indexes. Change background index builds to use the bulk builder and external sorter
Diffstat (limited to 'src')
40 files changed, 916 insertions, 226 deletions
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript index 2e3fb91c150..038ea188797 100644 --- a/src/mongo/db/catalog/SConscript +++ b/src/mongo/db/catalog/SConscript @@ -246,6 +246,9 @@ env.Library( '$BUILD_DIR/mongo/util/fail_point', '$BUILD_DIR/mongo/util/progress_meter', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/index/index_build_interceptor', + ] ) env.CppUnitTest( diff --git a/src/mongo/db/catalog/collection_compact.cpp b/src/mongo/db/catalog/collection_compact.cpp index 96075e81a6c..7971f96c29a 100644 --- a/src/mongo/db/catalog/collection_compact.cpp +++ b/src/mongo/db/catalog/collection_compact.cpp @@ -137,14 +137,12 @@ StatusWith<CompactStats> compactCollection(OperationContext* opCtx, if (!status.isOK()) return StatusWith<CompactStats>(status); - // The MMAPv1 storage engine used to add documents to indexer through the - // RecordStoreCompactAdaptor interface. status = recordStore->compact(opCtx); if (!status.isOK()) return StatusWith<CompactStats>(status); log() << "starting index commits"; - status = indexer.doneInserting(); + status = indexer.dumpInsertsFromBulk(); if (!status.isOK()) return StatusWith<CompactStats>(status); diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index ec0be702ade..8e6c517c4c0 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -656,60 +656,20 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx, << " != " << newDoc.objsize()); - // At the end of this step, we will have a map of UpdateTickets, one per index, which - // represent the index updates needed to be done, based on the changes between oldDoc and - // newDoc. - OwnedPointerMap<IndexDescriptor*, UpdateTicket> updateTickets; - if (indexesAffected) { - std::unique_ptr<IndexCatalog::IndexIterator> ii = - _indexCatalog->getIndexIterator(opCtx, true); - while (ii->more()) { - IndexCatalogEntry* entry = ii->next(); - IndexDescriptor* descriptor = entry->descriptor(); - IndexAccessMethod* iam = entry->accessMethod(); - - InsertDeleteOptions options; - _indexCatalog->prepareInsertDeleteOptions(opCtx, descriptor, &options); - UpdateTicket* updateTicket = new UpdateTicket(); - updateTickets.mutableMap()[descriptor] = updateTicket; - uassertStatusOK(iam->validateUpdate(opCtx, - oldDoc.value(), - newDoc, - oldLocation, - options, - updateTicket, - entry->getFilterExpression())); - } - } - args->preImageDoc = oldDoc.value().getOwned(); Status updateStatus = _recordStore->updateRecord(opCtx, oldLocation, newDoc.objdata(), newDoc.objsize()); - // Update each index with each respective UpdateTicket. if (indexesAffected) { - int64_t keysInsertedTotal = 0; - int64_t keysDeletedTotal = 0; + int64_t keysInserted, keysDeleted; - std::unique_ptr<IndexCatalog::IndexIterator> ii = - _indexCatalog->getIndexIterator(opCtx, true); - while (ii->more()) { - IndexCatalogEntry* entry = ii->next(); - IndexDescriptor* descriptor = entry->descriptor(); - IndexAccessMethod* iam = entry->accessMethod(); - - int64_t keysInserted; - int64_t keysDeleted; - uassertStatusOK(iam->update( - opCtx, *updateTickets.mutableMap()[descriptor], &keysInserted, &keysDeleted)); - keysInsertedTotal += keysInserted; - keysDeletedTotal += keysDeleted; - } + uassertStatusOK(_indexCatalog->updateRecord( + opCtx, args->preImageDoc.get(), newDoc, oldLocation, &keysInserted, &keysDeleted)); if (opDebug) { - opDebug->additiveMetrics.incrementKeysInserted(keysInsertedTotal); - opDebug->additiveMetrics.incrementKeysDeleted(keysDeletedTotal); + opDebug->additiveMetrics.incrementKeysInserted(keysInserted); + opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted); } } diff --git a/src/mongo/db/catalog/index_build_block.cpp b/src/mongo/db/catalog/index_build_block.cpp index 016890de388..5f8ef4e21d3 100644 --- a/src/mongo/db/catalog/index_build_block.cpp +++ b/src/mongo/db/catalog/index_build_block.cpp @@ -90,9 +90,11 @@ Status IndexCatalogImpl::IndexBuildBlock::init() { _entry = _catalog->_setupInMemoryStructures( _opCtx, std::move(descriptor), initFromDisk, isReadyIndex); - if (isBackgroundIndex) { - _indexBuildInterceptor = stdx::make_unique<IndexBuildInterceptor>(); - _indexBuildInterceptor->ensureSideWritesCollectionExists(_opCtx); + // Hybrid indexes are only enabled for background, non-unique indexes. + // TODO: Remove when SERVER-38036 and SERVER-37270 are complete. + const bool useHybrid = isBackgroundIndex && !descriptorPtr->unique(); + if (useHybrid) { + _indexBuildInterceptor = stdx::make_unique<IndexBuildInterceptor>(_opCtx); _entry->setIndexBuildInterceptor(_indexBuildInterceptor.get()); _opCtx->recoveryUnit()->onCommit( @@ -129,7 +131,6 @@ void IndexCatalogImpl::IndexBuildBlock::fail() { if (_entry) { invariant(_catalog->_dropIndex(_opCtx, _entry).isOK()); if (_indexBuildInterceptor) { - _indexBuildInterceptor->removeSideWritesCollection(_opCtx); _entry->setIndexBuildInterceptor(nullptr); } } else { @@ -145,11 +146,14 @@ void IndexCatalogImpl::IndexBuildBlock::success() { NamespaceString ns(_indexNamespace); invariant(_opCtx->lockState()->isDbLockedForMode(ns.db(), MODE_X)); + // An index build should never be completed with writes remaining in the interceptor. + invariant(!_indexBuildInterceptor || _indexBuildInterceptor->areAllWritesApplied(_opCtx)); + + LOG(2) << "marking index " << _indexName << " as ready in snapshot id " + << _opCtx->recoveryUnit()->getSnapshotId(); _collection->indexBuildSuccess(_opCtx, _entry); OperationContext* opCtx = _opCtx; - LOG(2) << "marking index " << _indexName << " as ready in snapshot id " - << opCtx->recoveryUnit()->getSnapshotId(); _opCtx->recoveryUnit()->onCommit( [ opCtx, entry = _entry, collection = _collection ](boost::optional<Timestamp> commitTime) { // Note: this runs after the WUOW commits but before we release our X lock on the @@ -168,11 +172,5 @@ void IndexCatalogImpl::IndexBuildBlock::success() { // able to remove this when the catalog is versioned. collection->setMinimumVisibleSnapshot(commitTime.get()); }); - - _entry->setIsReady(true); - if (_indexBuildInterceptor) { - _indexBuildInterceptor->removeSideWritesCollection(_opCtx); - _entry->setIndexBuildInterceptor(nullptr); - } } } // namespace mongo diff --git a/src/mongo/db/catalog/index_catalog.h b/src/mongo/db/catalog/index_catalog.h index 386a344d65c..4e8f70d6d66 100644 --- a/src/mongo/db/catalog/index_catalog.h +++ b/src/mongo/db/catalog/index_catalog.h @@ -370,6 +370,19 @@ public: int64_t* const keysInsertedOut) = 0; /** + * Both 'keysInsertedOut' and 'keysDeletedOut' are required and will be set to the number of + * index keys inserted and deleted by this operation, respectively. + * + * This method may throw. + */ + virtual Status updateRecord(OperationContext* const opCtx, + const BSONObj& oldDoc, + const BSONObj& newDoc, + const RecordId& recordId, + int64_t* const keysInsertedOut, + int64_t* const keysDeletedOut) = 0; + + /** * When 'keysDeletedOut' is not null, it will be set to the number of index keys removed by * this operation. */ diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp index 9a4aa990ab0..5d0c9542dd7 100644 --- a/src/mongo/db/catalog/index_catalog_impl.cpp +++ b/src/mongo/db/catalog/index_catalog_impl.cpp @@ -1168,8 +1168,7 @@ Status IndexCatalogImpl::_indexFilteredRecords(OperationContext* opCtx, } Status status = Status::OK(); - const bool hybridBuildsEnabled = false; - if (hybridBuildsEnabled && index->isBuilding()) { + if (index->isBuilding()) { int64_t inserted; status = index->indexBuildInterceptor()->sideWrite(opCtx, index->accessMethod(), @@ -1219,8 +1218,7 @@ Status IndexCatalogImpl::_unindexRecord(OperationContext* opCtx, const RecordId& loc, bool logIfError, int64_t* keysDeletedOut) { - const bool hybridBuildsEnabled = false; - if (hybridBuildsEnabled && index->isBuilding()) { + if (index->isBuilding()) { int64_t removed; auto status = index->indexBuildInterceptor()->sideWrite( opCtx, index->accessMethod(), &obj, loc, IndexBuildInterceptor::Op::kDelete, &removed); @@ -1281,6 +1279,61 @@ Status IndexCatalogImpl::indexRecords(OperationContext* opCtx, return Status::OK(); } +Status IndexCatalogImpl::updateRecord(OperationContext* const opCtx, + const BSONObj& oldDoc, + const BSONObj& newDoc, + const RecordId& recordId, + int64_t* const keysInsertedOut, + int64_t* const keysDeletedOut) { + *keysInsertedOut = 0; + *keysDeletedOut = 0; + + // Ready indexes go directly through the IndexAccessMethod. + for (IndexCatalogEntryContainer::const_iterator it = _readyIndexes.begin(); + it != _readyIndexes.end(); + ++it) { + IndexCatalogEntry* entry = it->get(); + + IndexDescriptor* descriptor = entry->descriptor(); + IndexAccessMethod* iam = entry->accessMethod(); + + InsertDeleteOptions options; + prepareInsertDeleteOptions(opCtx, descriptor, &options); + + UpdateTicket updateTicket; + + auto status = iam->validateUpdate( + opCtx, oldDoc, newDoc, recordId, options, &updateTicket, entry->getFilterExpression()); + if (!status.isOK()) + return status; + + int64_t keysInserted; + int64_t keysDeleted; + status = iam->update(opCtx, updateTicket, &keysInserted, &keysDeleted); + if (!status.isOK()) + return status; + + *keysInsertedOut += keysInserted; + *keysDeletedOut += keysDeleted; + } + + // Building indexes go through the interceptor. + BsonRecord record{recordId, Timestamp(), &newDoc}; + for (IndexCatalogEntryContainer::const_iterator it = _buildingIndexes.begin(); + it != _buildingIndexes.end(); + ++it) { + IndexCatalogEntry* entry = it->get(); + + bool logIfError = false; + invariant(_unindexRecord(opCtx, entry, oldDoc, recordId, logIfError, keysDeletedOut)); + + auto status = _indexRecords(opCtx, entry, {record}, keysInsertedOut); + if (!status.isOK()) + return status; + } + return Status::OK(); +} + void IndexCatalogImpl::unindexRecord(OperationContext* opCtx, const BSONObj& obj, const RecordId& loc, @@ -1359,10 +1412,18 @@ void IndexCatalogImpl::indexBuildSuccess(OperationContext* opCtx, IndexCatalogEn auto releasedEntry = _buildingIndexes.release(index->descriptor()); invariant(releasedEntry.get() == index); _readyIndexes.add(std::move(releasedEntry)); - opCtx->recoveryUnit()->onRollback([this, index]() { + + auto interceptor = index->indexBuildInterceptor(); + index->setIndexBuildInterceptor(nullptr); + index->setIsReady(true); + + opCtx->recoveryUnit()->onRollback([this, index, interceptor]() { auto releasedEntry = _readyIndexes.release(index->descriptor()); invariant(releasedEntry.get() == index); _buildingIndexes.add(std::move(releasedEntry)); + + index->setIndexBuildInterceptor(interceptor); + index->setIsReady(false); }); } diff --git a/src/mongo/db/catalog/index_catalog_impl.h b/src/mongo/db/catalog/index_catalog_impl.h index 7c6c415241f..121bcf69bc5 100644 --- a/src/mongo/db/catalog/index_catalog_impl.h +++ b/src/mongo/db/catalog/index_catalog_impl.h @@ -306,6 +306,15 @@ public: int64_t* keysInsertedOut) override; /** + * See IndexCatalog::updateRecord + */ + Status updateRecord(OperationContext* const opCtx, + const BSONObj& oldDoc, + const BSONObj& newDoc, + const RecordId& recordId, + int64_t* const keysInsertedOut, + int64_t* const keysDeletedOut) override; + /** * When 'keysDeletedOut' is not null, it will be set to the number of index keys removed by * this operation. */ diff --git a/src/mongo/db/catalog/index_catalog_noop.h b/src/mongo/db/catalog/index_catalog_noop.h index e645831063a..34ea94afee2 100644 --- a/src/mongo/db/catalog/index_catalog_noop.h +++ b/src/mongo/db/catalog/index_catalog_noop.h @@ -175,6 +175,15 @@ public: return Status::OK(); } + Status updateRecord(OperationContext* const opCtx, + const BSONObj& oldDoc, + const BSONObj& newDoc, + const RecordId& recordId, + int64_t* const keysInsertedOut, + int64_t* const keysDeletedOut) override { + return Status::OK(); + }; + void unindexRecord(OperationContext* const opCtx, const BSONObj& obj, const RecordId& loc, diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h index b2468a15c9b..70283a83fa5 100644 --- a/src/mongo/db/catalog/multi_index_block.h +++ b/src/mongo/db/catalog/multi_index_block.h @@ -151,9 +151,20 @@ public: * * Should not be called inside of a WriteUnitOfWork. */ - virtual Status doneInserting() = 0; - virtual Status doneInserting(std::set<RecordId>* const dupRecords) = 0; - virtual Status doneInserting(std::vector<BSONObj>* const dupKeysInserted) = 0; + virtual Status dumpInsertsFromBulk() = 0; + virtual Status dumpInsertsFromBulk(std::set<RecordId>* const dupRecords) = 0; + virtual Status dumpInsertsFromBulk(std::vector<BSONObj>* const dupKeysInserted) = 0; + + /** + * For background indexes using an IndexBuildInterceptor to capture inserts during a build, + * drain these writes into the index. If intent locks are held on the collection, more writes + * may come in after this drain completes. To ensure that all writes are completely drained + * before calling commit(), stop writes on the collection by holding a S or X while calling this + * method. + * + * Must not be in a WriteUnitOfWork. + */ + virtual Status drainBackgroundWritesIfNeeded() = 0; /** * Marks the index ready for use. Should only be called as the last method after diff --git a/src/mongo/db/catalog/multi_index_block_impl.cpp b/src/mongo/db/catalog/multi_index_block_impl.cpp index 28f90ae1249..5ce1badd0e4 100644 --- a/src/mongo/db/catalog/multi_index_block_impl.cpp +++ b/src/mongo/db/catalog/multi_index_block_impl.cpp @@ -276,7 +276,10 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlockImpl::init(const std::vector<BSO if (!status.isOK()) return status; - if (!_buildInBackground) { + // Foreground builds and background builds using an interceptor can use the bulk builder. + const bool useBulk = + !_buildInBackground || index.block->getEntry()->indexBuildInterceptor(); + if (useBulk) { // Bulk build process requires foreground building as it assumes nothing is changing // under it. index.bulk = index.real->initiateBulk(eachIndexBuildMaxMemoryUsageBytes); @@ -287,6 +290,7 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlockImpl::init(const std::vector<BSO _collection->getIndexCatalog()->prepareInsertDeleteOptions( _opCtx, descriptor, &index.options); index.options.dupsAllowed = index.options.dupsAllowed || _ignoreUnique; + index.options.fromIndexBuilder = true; if (_ignoreUnique) { index.options.getKeysMode = IndexAccessMethod::GetKeysMode::kRelaxConstraints; } @@ -487,11 +491,12 @@ Status MultiIndexBlockImpl::insertAllDocumentsInCollection() { progress->finished(); - Status ret = doneInserting(); + Status ret = dumpInsertsFromBulk(); if (!ret.isOK()) return ret; - log() << "build index done. scanned " << n << " total records. " << t.seconds() << " secs"; + log() << "build index collection scan done. scanned " << n << " total records. " << t.seconds() + << " secs"; return Status::OK(); } @@ -534,19 +539,20 @@ Status MultiIndexBlockImpl::insert(const BSONObj& doc, return Status::OK(); } -Status MultiIndexBlockImpl::doneInserting() { - return _doneInserting(nullptr, nullptr); +Status MultiIndexBlockImpl::dumpInsertsFromBulk() { + return _dumpInsertsFromBulk(nullptr, nullptr); } -Status MultiIndexBlockImpl::doneInserting(std::set<RecordId>* dupRecords) { - return _doneInserting(dupRecords, nullptr); +Status MultiIndexBlockImpl::dumpInsertsFromBulk(std::set<RecordId>* dupRecords) { + return _dumpInsertsFromBulk(dupRecords, nullptr); } -Status MultiIndexBlockImpl::doneInserting(std::vector<BSONObj>* dupKeysInserted) { - return _doneInserting(nullptr, dupKeysInserted); +Status MultiIndexBlockImpl::dumpInsertsFromBulk(std::vector<BSONObj>* dupKeysInserted) { + return _dumpInsertsFromBulk(nullptr, dupKeysInserted); } -Status MultiIndexBlockImpl::_doneInserting(std::set<RecordId>* dupRecords, - std::vector<BSONObj>* dupKeysInserted) { + +Status MultiIndexBlockImpl::_dumpInsertsFromBulk(std::set<RecordId>* dupRecords, + std::vector<BSONObj>* dupKeysInserted) { if (State::kAborted == _getState()) { return {ErrorCodes::IndexBuildAborted, str::stream() << "Index build aborted: " << _abortReason @@ -561,7 +567,7 @@ Status MultiIndexBlockImpl::_doneInserting(std::set<RecordId>* dupRecords, for (size_t i = 0; i < _indexes.size(); i++) { if (_indexes[i].bulk == NULL) continue; - LOG(1) << "\t bulk commit starting for index: " + LOG(1) << "\t dumping from external sorter into index: " << _indexes[i].block->getEntry()->descriptor()->indexName(); Status status = _indexes[i].real->commitBulk(_opCtx, _indexes[i].bulk.get(), @@ -579,6 +585,43 @@ Status MultiIndexBlockImpl::_doneInserting(std::set<RecordId>* dupRecords, return Status::OK(); } +Status MultiIndexBlockImpl::drainBackgroundWritesIfNeeded() { + if (State::kAborted == _getState()) { + return {ErrorCodes::IndexBuildAborted, + str::stream() << "Index build aborted: " << _abortReason + << ". Cannot complete drain phase: " + << _collection->ns().ns() + << "(" + << *_collection->uuid() + << ")"}; + } + + invariant(!_opCtx->lockState()->inAWriteUnitOfWork()); + + // Drain side-writes table for each index. This only drains what is visible. Assuming intent + // locks are held on the user collection, more writes can come in after this drain completes. + // Callers are responsible for stopping writes by holding an S or X lock while draining before + // completing the index build. + for (size_t i = 0; i < _indexes.size(); i++) { + auto interceptor = _indexes[i].block->getEntry()->indexBuildInterceptor(); + if (!interceptor) + continue; + + LOG(1) << "draining background writes on collection " << _collection->ns() + << " into index: " << _indexes[i].block->getEntry()->descriptor()->indexName(); + + auto status = interceptor->drainWritesIntoIndex(_opCtx, + _indexes[i].real, + _indexes[i].block->getEntry()->descriptor(), + _indexes[i].options); + if (!status.isOK()) { + return status; + } + } + return Status::OK(); +} + + void MultiIndexBlockImpl::abortWithoutCleanup() { _setStateToAbortedIfNotCommitted("aborted without cleanup"_sd); _indexes.clear(); @@ -613,6 +656,16 @@ Status MultiIndexBlockImpl::commit(stdx::function<void(const BSONObj& spec)> onC onCreateFn(_indexes[i].block->getSpec()); } + // Do this before calling success(), which unsets the interceptor pointer on the index + // catalog entry. + auto interceptor = _indexes[i].block->getEntry()->indexBuildInterceptor(); + if (interceptor) { + auto multikeyPaths = interceptor->getMultikeyPaths(); + if (multikeyPaths) { + _indexes[i].block->getEntry()->setMultikey(_opCtx, multikeyPaths.get()); + } + } + _indexes[i].block->success(); // The bulk builder will track multikey information itself. Non-bulk builders re-use the diff --git a/src/mongo/db/catalog/multi_index_block_impl.h b/src/mongo/db/catalog/multi_index_block_impl.h index ab008f9692c..c842298289c 100644 --- a/src/mongo/db/catalog/multi_index_block_impl.h +++ b/src/mongo/db/catalog/multi_index_block_impl.h @@ -82,9 +82,14 @@ public: const RecordId& loc, std::vector<BSONObj>* const dupKeysInserted = nullptr) override; - Status doneInserting() override; - Status doneInserting(std::set<RecordId>* dupRecords) override; - Status doneInserting(std::vector<BSONObj>* dupKeysInserted) override; + Status dumpInsertsFromBulk() override; + Status dumpInsertsFromBulk(std::set<RecordId>* dupRecords) override; + Status dumpInsertsFromBulk(std::vector<BSONObj>* dupKeysInserted) override; + + /** + * See MultiIndexBlock::drainBackgroundWritesIfNeeded() + */ + Status drainBackgroundWritesIfNeeded() override; Status commit() override; Status commit(stdx::function<void(const BSONObj& spec)> onCreateFn) override; @@ -134,7 +139,8 @@ private: InsertDeleteOptions options; }; - Status _doneInserting(std::set<RecordId>* dupRecords, std::vector<BSONObj>* dupKeysInserted); + Status _dumpInsertsFromBulk(std::set<RecordId>* dupRecords, + std::vector<BSONObj>* dupKeysInserted); /** * Returns the current state. diff --git a/src/mongo/db/catalog/multi_index_block_test.cpp b/src/mongo/db/catalog/multi_index_block_test.cpp index 11dcd74502c..562bdd518fb 100644 --- a/src/mongo/db/catalog/multi_index_block_test.cpp +++ b/src/mongo/db/catalog/multi_index_block_test.cpp @@ -112,7 +112,7 @@ TEST_F(MultiIndexBlockTest, CommitWithoutInsertingDocuments) { ASSERT_EQUALS(0U, specs.size()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest()); - ASSERT_OK(indexer->doneInserting()); + ASSERT_OK(indexer->dumpInsertsFromBulk()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kPreCommit, indexer->getState_forTest()); ASSERT_FALSE(indexer->isCommitted()); @@ -134,7 +134,7 @@ TEST_F(MultiIndexBlockTest, CommitAfterInsertingSingleDocument) { ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest()); ASSERT_OK(indexer->insert({}, {}, nullptr)); - ASSERT_OK(indexer->doneInserting()); + ASSERT_OK(indexer->dumpInsertsFromBulk()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kPreCommit, indexer->getState_forTest()); ASSERT_FALSE(indexer->isCommitted()); @@ -192,7 +192,7 @@ TEST_F(MultiIndexBlockTest, InsertingSingleDocumentFailsAfterAbort) { ASSERT_FALSE(indexer->isCommitted()); } -TEST_F(MultiIndexBlockTest, DoneInsertingFailsAfterAbort) { +TEST_F(MultiIndexBlockTest, dumpInsertsFromBulkFailsAfterAbort) { auto indexer = getIndexer(); ASSERT_EQUALS(MultiIndexBlockImpl::State::kUninitialized, indexer->getState_forTest()); @@ -206,7 +206,7 @@ TEST_F(MultiIndexBlockTest, DoneInsertingFailsAfterAbort) { indexer->abort("test"_sd); ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest()); - ASSERT_EQUALS(ErrorCodes::IndexBuildAborted, indexer->doneInserting()); + ASSERT_EQUALS(ErrorCodes::IndexBuildAborted, indexer->dumpInsertsFromBulk()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest()); ASSERT_FALSE(indexer->isCommitted()); @@ -223,7 +223,7 @@ TEST_F(MultiIndexBlockTest, CommitFailsAfterAbort) { ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {}, nullptr)); ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest()); - ASSERT_OK(indexer->doneInserting()); + ASSERT_OK(indexer->dumpInsertsFromBulk()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kPreCommit, indexer->getState_forTest()); indexer->abort("test"_sd); diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index c3e71007b5e..c6c8c128e4c 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -462,7 +462,7 @@ Status renameCollectionCommon(OperationContext* opCtx, return status; } - status = indexer.doneInserting(); + status = indexer.dumpInsertsFromBulk(); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 15add881ec2..ea461d2a5c3 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -28,6 +28,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kIndex + #include "mongo/platform/basic.h" #include <string> @@ -60,6 +62,8 @@ #include "mongo/db/server_options.h" #include "mongo/db/views/view_catalog.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -68,6 +72,10 @@ using std::string; using IndexVersion = IndexDescriptor::IndexVersion; +MONGO_FAIL_POINT_DEFINE(hangAfterIndexBuildFirstDrain); +MONGO_FAIL_POINT_DEFINE(hangAfterIndexBuildSecondDrain); +MONGO_FAIL_POINT_DEFINE(hangAfterIndexBuildDumpsInsertsFromBulk); + namespace { const StringData kIndexesFieldName = "indexes"_sd; @@ -389,11 +397,7 @@ public: dbLock.relockWithMode(MODE_IX); } - try { - Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_IX); - uassertStatusOK(indexer.insertAllDocumentsInCollection()); - } catch (const DBException& e) { - invariant(e.code() != ErrorCodes::WriteConflict); + auto relockOnErrorGuard = MakeGuard([&] { // Must have exclusive DB lock before we clean up the index build via the // destructor of 'indexer'. if (indexer.getBuildInBackground()) { @@ -406,8 +410,50 @@ public: std::terminate(); } } - throw; + }); + + // Collection scan and insert into index, followed by a drain of writes received in the + // background. + { + Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_IX); + uassertStatusOK(indexer.insertAllDocumentsInCollection()); + } + + if (MONGO_FAIL_POINT(hangAfterIndexBuildDumpsInsertsFromBulk)) { + log() << "Hanging after dumping inserts from bulk builder"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterIndexBuildDumpsInsertsFromBulk); + } + + // Perform the first drain while holding an intent lock. + { + opCtx->recoveryUnit()->abandonSnapshot(); + Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_IS); + + LOG(1) << "performing first index build drain"; + uassertStatusOK(indexer.drainBackgroundWritesIfNeeded()); + } + + if (MONGO_FAIL_POINT(hangAfterIndexBuildFirstDrain)) { + log() << "Hanging after index build first drain"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterIndexBuildFirstDrain); + } + + // Perform the second drain while stopping writes on the collection. + { + opCtx->recoveryUnit()->abandonSnapshot(); + Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_S); + + LOG(1) << "performing second index build drain"; + uassertStatusOK(indexer.drainBackgroundWritesIfNeeded()); + } + + if (MONGO_FAIL_POINT(hangAfterIndexBuildSecondDrain)) { + log() << "Hanging after index build second drain"; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterIndexBuildSecondDrain); } + + relockOnErrorGuard.Dismiss(); + // Need to return db lock back to exclusive, to complete the index build. if (indexer.getBuildInBackground()) { opCtx->recoveryUnit()->abandonSnapshot(); @@ -418,10 +464,15 @@ public: DatabaseShardingState::get(db).checkDbVersion(opCtx); } - uassert(28551, "database dropped during index build", db); - uassert(28552, "collection dropped during index build", db->getCollection(opCtx, ns)); + invariant(db); + invariant(db->getCollection(opCtx, ns)); } + // Perform the third and final drain after releasing a shared lock and reacquiring an + // exclusive lock on the database. + LOG(1) << "performing final index build drain"; + uassertStatusOK(indexer.drainBackgroundWritesIfNeeded()); + writeConflictRetry(opCtx, kCommandName, ns.ns(), [&] { WriteUnitOfWork wunit(opCtx); diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h index 4ba068ac445..7f6f52e9c99 100644 --- a/src/mongo/db/concurrency/d_concurrency.h +++ b/src/mongo/db/concurrency/d_concurrency.h @@ -319,6 +319,10 @@ public: return _result == LOCK_OK; } + LockMode mode() const { + return _mode; + } + private: const ResourceId _id; OperationContext* const _opCtx; diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript index 1f37291f34b..87f54e29f47 100644 --- a/src/mongo/db/index/SConscript +++ b/src/mongo/db/index/SConscript @@ -157,7 +157,6 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/multi_key_path_tracker', - '$BUILD_DIR/mongo/db/s/sharding_api_d', 'index_access_methods', ], ) diff --git a/src/mongo/db/index/duplicate_key_tracker_test.cpp b/src/mongo/db/index/duplicate_key_tracker_test.cpp index abb373e94e0..b1abac80d95 100644 --- a/src/mongo/db/index/duplicate_key_tracker_test.cpp +++ b/src/mongo/db/index/duplicate_key_tracker_test.cpp @@ -280,7 +280,7 @@ TEST_F(DuplicateKeyTrackerTest, BulkIndexBuild) { std::vector<BSONObj> dupsInserted; // Neither of these inserts will recognize duplicates because the bulk inserter does not - // detect them until doneInserting() is called. + // detect them until dumpInsertsFromBulk() is called. ASSERT_OK(indexer.insert(record1->data.releaseToBson(), record1->id, &dupsInserted)); ASSERT_EQ(0u, dupsInserted.size()); @@ -289,7 +289,7 @@ TEST_F(DuplicateKeyTrackerTest, BulkIndexBuild) { ASSERT_OK(indexer.insert(record2->data.releaseToBson(), record2->id, &dupsInserted)); ASSERT_EQ(0u, dupsInserted.size()); - ASSERT_OK(indexer.doneInserting(&dupsInserted)); + ASSERT_OK(indexer.dumpInsertsFromBulk(&dupsInserted)); ASSERT_EQ(1u, dupsInserted.size()); // Record that duplicates were inserted. diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index 86a936c3bc6..250223530a3 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -184,13 +184,27 @@ Status AbstractIndexAccessMethod::insert(OperationContext* opCtx, const RecordId& loc, const InsertDeleteOptions& options, InsertResult* result) { - bool checkIndexKeySize = shouldCheckIndexKeySize(opCtx); + invariant(options.fromIndexBuilder || !_btreeState->isBuilding()); + BSONObjSet multikeyMetadataKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); MultikeyPaths multikeyPaths; + // Delegate to the subclass. getKeys(obj, options.getKeysMode, &keys, &multikeyMetadataKeys, &multikeyPaths); + return insertKeys(opCtx, keys, multikeyMetadataKeys, multikeyPaths, loc, options, result); +} + +Status AbstractIndexAccessMethod::insertKeys(OperationContext* opCtx, + const BSONObjSet& keys, + const BSONObjSet& multikeyMetadataKeys, + const MultikeyPaths& multikeyPaths, + const RecordId& loc, + const InsertDeleteOptions& options, + InsertResult* result) { + bool checkIndexKeySize = shouldCheckIndexKeySize(opCtx); + // Add all new data keys, and all new multikey metadata keys, into the index. When iterating // over the data keys, each of them should point to the doc's RecordId. When iterating over // the multikey metadata keys, they should point to the reserved 'kMultikeyMetadataKeyId'. @@ -236,7 +250,6 @@ Status AbstractIndexAccessMethod::insert(OperationContext* opCtx, if (shouldMarkIndexAsMultikey(keys, multikeyMetadataKeys, multikeyPaths)) { _btreeState->setMultikey(opCtx, multikeyPaths); } - return Status::OK(); } @@ -271,7 +284,9 @@ Status AbstractIndexAccessMethod::remove(OperationContext* opCtx, const RecordId& loc, const InsertDeleteOptions& options, int64_t* numDeleted) { + invariant(!_btreeState->isBuilding()); invariant(numDeleted); + *numDeleted = 0; BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); // There's no need to compute the prefixes of the indexed fields that cause the index to be @@ -285,12 +300,20 @@ Status AbstractIndexAccessMethod::remove(OperationContext* opCtx, getKeys( obj, GetKeysMode::kRelaxConstraintsUnfiltered, &keys, multikeyMetadataKeys, multikeyPaths); + return removeKeys(opCtx, keys, loc, options, numDeleted); +} + +Status AbstractIndexAccessMethod::removeKeys(OperationContext* opCtx, + const BSONObjSet& keys, + const RecordId& loc, + const InsertDeleteOptions& options, + int64_t* numDeleted) { + for (const auto& key : keys) { removeOneKey(opCtx, key, loc, options.dupsAllowed); } *numDeleted = keys.size(); - return Status::OK(); } @@ -446,6 +469,7 @@ Status AbstractIndexAccessMethod::update(OperationContext* opCtx, const UpdateTicket& ticket, int64_t* numInserted, int64_t* numDeleted) { + invariant(!_btreeState->isBuilding()); invariant(ticket.newKeys.size() == ticket.oldKeys.size() + ticket.added.size() - ticket.removed.size()); invariant(numInserted); diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index 54f2e351d26..3c466486410 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -93,6 +93,14 @@ public: const InsertDeleteOptions& options, InsertResult* result) = 0; + virtual Status insertKeys(OperationContext* opCtx, + const BSONObjSet& keys, + const BSONObjSet& multikeyMetadataKeys, + const MultikeyPaths& multikeyPaths, + const RecordId& loc, + const InsertDeleteOptions& options, + InsertResult* result) = 0; + /** * Analogous to above, but remove the records instead of inserting them. * 'numDeleted' will be set to the number of keys removed from the index for the document. @@ -103,6 +111,12 @@ public: const InsertDeleteOptions& options, int64_t* numDeleted) = 0; + virtual Status removeKeys(OperationContext* opCtx, + const BSONObjSet& keys, + const RecordId& loc, + const InsertDeleteOptions& options, + int64_t* numDeleted) = 0; + /** * Checks whether the index entries for the document 'from', which is placed at location * 'loc' on disk, can be changed to the index entries for the doc 'to'. Provides a ticket @@ -403,6 +417,10 @@ struct InsertDeleteOptions { // Are duplicate keys allowed in the index? bool dupsAllowed = false; + // Only an index builder is allowed to insert into the index while it is building, so only the + // index builder should set this to 'true'. + bool fromIndexBuilder = false; + // Should we relax the index constraints? IndexAccessMethod::GetKeysMode getKeysMode = IndexAccessMethod::GetKeysMode::kEnforceConstraints; @@ -439,12 +457,26 @@ public: const InsertDeleteOptions& options, InsertResult* result) final; + Status insertKeys(OperationContext* opCtx, + const BSONObjSet& keys, + const BSONObjSet& multikeyMetadataKeys, + const MultikeyPaths& multikeyPaths, + const RecordId& loc, + const InsertDeleteOptions& options, + InsertResult* result) final; + Status remove(OperationContext* opCtx, const BSONObj& obj, const RecordId& loc, const InsertDeleteOptions& options, int64_t* numDeleted) final; + Status removeKeys(OperationContext* opCtx, + const BSONObjSet& keys, + const RecordId& loc, + const InsertDeleteOptions& options, + int64_t* numDeleted) final; + Status validateUpdate(OperationContext* opCtx, const BSONObj& from, const BSONObj& to, diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp index 6af48b542c1..c82baee0b0c 100644 --- a/src/mongo/db/index/index_build_interceptor.cpp +++ b/src/mongo/db/index/index_build_interceptor.cpp @@ -35,47 +35,195 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/multi_key_path_tracker.h" #include "mongo/db/operation_context.h" -#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/service_context.h" #include "mongo/util/log.h" +#include "mongo/util/progress_meter.h" #include "mongo/util/uuid.h" namespace mongo { -namespace { -const bool makeCollections = false; -} +IndexBuildInterceptor::IndexBuildInterceptor(OperationContext* opCtx) + : _sideWritesTable( + opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx)){}; -NamespaceString IndexBuildInterceptor::makeTempSideWritesNs() { - return NamespaceString("local.system.sideWrites-" + UUID::gen().toString()); -} +Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, + IndexAccessMethod* indexAccessMethod, + const IndexDescriptor* indexDescriptor, + const InsertDeleteOptions& options) { + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + + // These are used for logging only. + int64_t totalDeleted = 0; + int64_t totalInserted = 0; + + const int64_t appliedAtStart = _numApplied; + + // Set up the progress meter. This will never be completely accurate, because more writes can be + // read from the side writes table than are observed before draining. + static const char* curopMessage = "Index build draining writes"; + stdx::unique_lock<Client> lk(*opCtx->getClient()); + ProgressMeterHolder progress(CurOp::get(opCtx)->setMessage_inlock( + curopMessage, curopMessage, _sideWritesCounter.load() - appliedAtStart, 1)); + lk.unlock(); + + // Buffer operations into batches to insert per WriteUnitOfWork. Impose an upper limit on the + // number of documents and the total size of the batch. + const int32_t kBatchMaxSize = 1000; + const int64_t kBatchMaxBytes = BSONObjMaxInternalSize; + + int64_t batchSizeBytes = 0; + + std::vector<SideWriteRecord> batch; + batch.reserve(kBatchMaxSize); + + // Hold on to documents that would exceed the per-batch memory limit. Always insert this first + // into the next batch. + boost::optional<SideWriteRecord> stashed; + + auto cursor = _sideWritesTable->rs()->getCursor(opCtx); + + bool atEof = false; + while (!atEof) { + + // Stashed records should be inserted into a batch first. + if (stashed) { + invariant(batch.empty()); + batch.push_back(std::move(stashed.get())); + stashed.reset(); + } + + auto record = cursor->next(); + + if (record) { + RecordId currentRecordId = record->id; + BSONObj docOut = record->data.toBson().getOwned(); + + // If the total batch size in bytes would be too large, stash this document and let the + // current batch insert. + int objSize = docOut.objsize(); + if (batchSizeBytes + objSize > kBatchMaxBytes) { + invariant(!stashed); + + // Stash this document to be inserted in the next batch. + stashed.emplace(currentRecordId, std::move(docOut)); + } else { + batchSizeBytes += objSize; + batch.emplace_back(currentRecordId, std::move(docOut)); + + // Continue if there is more room in the batch. + if (batch.size() < kBatchMaxSize) { + continue; + } + } + } else { + atEof = true; + if (batch.empty()) + break; + } + + // Account for more writes coming in after the drain starts. + progress->setTotalWhileRunning(_sideWritesCounter.loadRelaxed() - appliedAtStart); + + invariant(!batch.empty()); + + // If we are here, either we have reached the end of the table or the batch is full, so + // insert everything in one WriteUnitOfWork, and delete each inserted document from the side + // writes table. + WriteUnitOfWork wuow(opCtx); + for (auto& operation : batch) { + auto status = _applyWrite( + opCtx, indexAccessMethod, operation.second, options, &totalInserted, &totalDeleted); + if (!status.isOK()) { + return status; + } -void IndexBuildInterceptor::ensureSideWritesCollectionExists(OperationContext* opCtx) { - if (!makeCollections) { - return; + // Delete the document from the table as soon as it has been inserted into the index. + // This ensures that no key is ever inserted twice and no keys are skipped. + _sideWritesTable->rs()->deleteRecord(opCtx, operation.first); + } + cursor->save(); + wuow.commit(); + + cursor->restore(); + + progress->hit(batch.size()); + _numApplied += batch.size(); + batch.clear(); + batchSizeBytes = 0; } - // TODO SERVER-38027 Consider pushing this higher into the createIndexes command logic. - OperationShardingState::get(opCtx).setAllowImplicitCollectionCreation(BSONElement()); + progress->finished(); - AutoGetOrCreateDb local(opCtx, "local", LockMode::MODE_X); - CollectionOptions options; - options.setNoIdIndex(); - options.temp = true; + log() << "index build for " << indexDescriptor->indexName() << ": drain applied " + << (_numApplied - appliedAtStart) << " side writes. i: " << totalInserted + << ", d: " << totalDeleted << ", total: " << _numApplied; - local.getDb()->createCollection(opCtx, _sideWritesNs.ns(), options); + return Status::OK(); } -void IndexBuildInterceptor::removeSideWritesCollection(OperationContext* opCtx) { - if (!makeCollections) { - return; +Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, + IndexAccessMethod* indexAccessMethod, + const BSONObj& operation, + const InsertDeleteOptions& options, + int64_t* const keysInserted, + int64_t* const keysDeleted) { + const BSONObj key = operation["key"].Obj(); + const RecordId opRecordId = RecordId(operation["recordId"].Long()); + const Op opType = + (strcmp(operation.getStringField("op"), "i") == 0) ? Op::kInsert : Op::kDelete; + const BSONObjSet keySet = SimpleBSONObjComparator::kInstance.makeBSONObjSet({key}); + + if (opType == Op::kInsert) { + InsertResult result; + Status s = + indexAccessMethod->insertKeys(opCtx, + keySet, + SimpleBSONObjComparator::kInstance.makeBSONObjSet(), + MultikeyPaths{}, + opRecordId, + options, + &result); + if (!s.isOK()) { + return s; + } + + invariant(!result.dupsInserted.size()); + *keysInserted += result.numInserted; + } else { + invariant(opType == Op::kDelete); + DEV invariant(strcmp(operation.getStringField("op"), "d") == 0); + + int64_t numDeleted; + Status s = indexAccessMethod->removeKeys(opCtx, keySet, opRecordId, options, &numDeleted); + if (!s.isOK()) { + return s; + } + + *keysDeleted += numDeleted; } + return Status::OK(); +} - AutoGetDb local(opCtx, "local", LockMode::MODE_X); - fassert(50994, local.getDb()->dropCollectionEvenIfSystem(opCtx, _sideWritesNs, repl::OpTime())); +bool IndexBuildInterceptor::areAllWritesApplied(OperationContext* opCtx) const { + invariant(_sideWritesTable); + auto cursor = _sideWritesTable->rs()->getCursor(opCtx, false /* forward */); + auto record = cursor->next(); + + // The table is empty only when all writes are applied. + if (!record) + return true; + + return false; +} + +boost::optional<MultikeyPaths> IndexBuildInterceptor::getMultikeyPaths() const { + stdx::unique_lock<stdx::mutex> lk(_multikeyPathMutex); + return _multikeyPaths; } Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, @@ -83,7 +231,9 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, const BSONObj* obj, RecordId loc, Op op, - int64_t* numKeysOut) { + int64_t* const numKeysOut) { + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + *numKeysOut = 0; BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); BSONObjSet multikeyMetadataKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); @@ -98,23 +248,24 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, // `multikeyMetadataKeys` when inserting. *numKeysOut = keys.size() + (op == Op::kInsert ? multikeyMetadataKeys.size() : 0); - if (_multikeyPaths) { - MultikeyPathTracker::mergeMultikeyPaths(&_multikeyPaths.get(), multikeyPaths); - } else { - // `mergeMultikeyPaths` is sensitive to the two inputs having the same multikey - // "shape". Initialize `_multikeyPaths` with the right shape from the first result. - _multikeyPaths = multikeyPaths; + if (*numKeysOut == 0) { + return Status::OK(); } - AutoGetCollection coll(opCtx, _sideWritesNs, LockMode::MODE_IX); - invariant(coll.getCollection()); + { + stdx::unique_lock<stdx::mutex> lk(_multikeyPathMutex); + if (_multikeyPaths) { + MultikeyPathTracker::mergeMultikeyPaths(&_multikeyPaths.get(), multikeyPaths); + } else { + // `mergeMultikeyPaths` is sensitive to the two inputs having the same multikey + // "shape". Initialize `_multikeyPaths` with the right shape from the first result. + _multikeyPaths = multikeyPaths; + } + } - std::vector<InsertStatement> toInsert; + std::vector<BSONObj> toInsert; for (const auto& key : keys) { - // Documents inserted into this table must be consumed in insert-order. Today, we can rely - // on storage engines to return documents in insert-order, but with clustered indexes, - // that may no longer be true. - // + // Documents inserted into this table must be consumed in insert-order. // Additionally, these writes should be timestamped with the same timestamps that the // other writes making up this operation are given. When index builds can cope with // replication rollbacks, side table writes associated with a CUD operation should @@ -138,9 +289,21 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, } } - OpDebug* const opDebug = nullptr; - const bool fromMigrate = false; - return coll.getCollection()->insertDocuments( - opCtx, toInsert.begin(), toInsert.end(), opDebug, fromMigrate); + _sideWritesCounter.fetchAndAdd(toInsert.size()); + // This insert may roll back, but not necessarily from inserting into this table. If other write + // operations outside this table and in the same transaction are rolled back, this counter also + // needs to be rolled back. + opCtx->recoveryUnit()->onRollback( + [ this, size = toInsert.size() ] { _sideWritesCounter.fetchAndSubtract(size); }); + + std::vector<Record> records; + for (auto& obj : toInsert) { + records.emplace_back(Record{RecordId(), RecordData(obj.objdata(), obj.objsize())}); + } + + // By passing a vector of null timestamps, these inserts are not timestamped individually, but + // rather with the timestamp of the owning operation. + std::vector<Timestamp> timestamps(records.size()); + return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps); } } // namespace mongo diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h index 13ae79d10d2..b25fbf53c71 100644 --- a/src/mongo/db/index/index_build_interceptor.h +++ b/src/mongo/db/index/index_build_interceptor.h @@ -29,9 +29,10 @@ #pragma once +#include "mongo/db/index/index_access_method.h" #include "mongo/db/index/multikey_paths.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/record_id.h" +#include "mongo/db/storage/temporary_record_store.h" #include "mongo/platform/atomic_word.h" namespace mongo { @@ -44,13 +45,12 @@ class IndexBuildInterceptor { public: enum class Op { kInsert, kDelete }; - IndexBuildInterceptor() : _sideWritesNs(makeTempSideWritesNs()) {} - IndexBuildInterceptor(NamespaceString sideWritesNs) : _sideWritesNs(sideWritesNs) {} - - static NamespaceString makeTempSideWritesNs(); - - void ensureSideWritesCollectionExists(OperationContext* opCtx); - void removeSideWritesCollection(OperationContext* opCtx); + /** + * The OperationContext is used to construct a temporary table in the storage engine to + * intercept side writes. This interceptor must not exist longer than the operation context used + * to construct it, as the underlying TemporaryRecordStore needs it to destroy itself. + */ + IndexBuildInterceptor(OperationContext* opCtx); /** * Client writes that are concurrent with an index build will have their index updates written @@ -64,10 +64,51 @@ public: const BSONObj* obj, RecordId loc, Op op, - int64_t* numKeysOut); + int64_t* const numKeysOut); + + /** + * Performs a resumable scan on the side writes table, and either inserts or removes each key + * from the underlying IndexAccessMethod. This will only insert as many records as are visible + * in the current snapshot. + * + * This is resumable, so subsequent calls will start the scan at the record immediately + * following the last inserted record from a previous call to drainWritesIntoIndex. + */ + Status drainWritesIntoIndex(OperationContext* opCtx, + IndexAccessMethod* indexAccessMethod, + const IndexDescriptor* indexDescriptor, + const InsertDeleteOptions& options); + + /** + * Returns 'true' if there are no visible records remaining to be applied from the side writes + * table. Ensure that this returns 'true' when an index build is completed. + */ + bool areAllWritesApplied(OperationContext* opCtx) const; + + /** + * When an index builder wants to commit, use this to retrieve any recorded multikey paths + * that were tracked during the build. + */ + boost::optional<MultikeyPaths> getMultikeyPaths() const; private: - NamespaceString _sideWritesNs; + using SideWriteRecord = std::pair<RecordId, BSONObj>; + + Status _applyWrite(OperationContext* opCtx, + IndexAccessMethod* indexAccessMethod, + const BSONObj& doc, + const InsertDeleteOptions& options, + int64_t* const keysInserted, + int64_t* const keysDeleted); + + // This temporary record store is owned by the interceptor and dropped along with it. + std::unique_ptr<TemporaryRecordStore> _sideWritesTable; + + int64_t _numApplied{0}; + + AtomicInt64 _sideWritesCounter{0}; + + mutable stdx::mutex _multikeyPathMutex; boost::optional<MultikeyPaths> _multikeyPaths; }; diff --git a/src/mongo/db/index_builder.cpp b/src/mongo/db/index_builder.cpp index c4977673c37..93691e3139f 100644 --- a/src/mongo/db/index_builder.cpp +++ b/src/mongo/db/index_builder.cpp @@ -170,25 +170,30 @@ void IndexBuilder::waitForBgIndexStarting() { } namespace { -/** - * @param status shalt not be of code `WriteConflict`. - */ -Status _failIndexBuild(MultiIndexBlock& indexer, Status status, bool allowBackgroundBuilding) { +Status _failIndexBuild(OperationContext* opCtx, + MultiIndexBlock& indexer, + Lock::DBLock* dbLock, + Status status, + bool allowBackgroundBuilding) { invariant(status.code() != ErrorCodes::WriteConflict); + if (!allowBackgroundBuilding) { + return status; + } + + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + if (dbLock->mode() != MODE_X) { + dbLock->relockWithMode(MODE_X); + } + if (status.code() == ErrorCodes::InterruptedAtShutdown) { // leave it as-if kill -9 happened. This will be handled on restart. - invariant(allowBackgroundBuilding); // Foreground builds aren't interrupted. indexer.abortWithoutCleanup(); return status; } - if (allowBackgroundBuilding) { - error() << "Background index build failed. Status: " << redact(status); - fassertFailed(50769); - } else { - return status; - } + error() << "Background index build failed. Status: " << redact(status); + fassertFailed(50769); } } // namespace @@ -242,12 +247,13 @@ Status IndexBuilder::_build(OperationContext* opCtx, return Status::OK(); } if (!status.isOK()) { - return _failIndexBuild(indexer, status, allowBackgroundBuilding); + return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding); } if (allowBackgroundBuilding) { _setBgIndexStarting(); invariant(dbLock); + opCtx->recoveryUnit()->abandonSnapshot(); dbLock->relockWithMode(MODE_IX); } @@ -255,21 +261,38 @@ Status IndexBuilder::_build(OperationContext* opCtx, Lock::CollectionLock collLock(opCtx->lockState(), ns.ns(), MODE_IX); // WriteConflict exceptions and statuses are not expected to escape this method. status = indexer.insertAllDocumentsInCollection(); + if (!status.isOK()) { + return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding); + } + + // Perform the first drain while holding an intent lock. + status = indexer.drainBackgroundWritesIfNeeded(); } if (!status.isOK()) { - if (allowBackgroundBuilding) { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - dbLock->relockWithMode(MODE_X); - if (status == ErrorCodes::InterruptedAtShutdown) - return _failIndexBuild(indexer, status, allowBackgroundBuilding); - opCtx->checkForInterrupt(); - } - return _failIndexBuild(indexer, status, allowBackgroundBuilding); + return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding); + } + + // Perform the second drain while stopping inserts into the collection. + { + Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_S); + status = indexer.drainBackgroundWritesIfNeeded(); + } + if (!status.isOK()) { + return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding); } if (allowBackgroundBuilding) { + opCtx->recoveryUnit()->abandonSnapshot(); dbLock->relockWithMode(MODE_X); } + + // Perform the third and final drain after releasing a shared lock and reacquiring an + // exclusive lock on the database. + status = indexer.drainBackgroundWritesIfNeeded(); + if (!status.isOK()) { + return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding); + } + status = writeConflictRetry(opCtx, "Commit index build", ns.ns(), [this, opCtx, &indexer, &ns] { WriteUnitOfWork wunit(opCtx); auto status = indexer.commit(); @@ -301,7 +324,7 @@ Status IndexBuilder::_build(OperationContext* opCtx, return Status::OK(); }); if (!status.isOK()) { - return _failIndexBuild(indexer, status, allowBackgroundBuilding); + return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding); } if (allowBackgroundBuilding) { diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp index da970fb3a8b..6a0ededa2c0 100644 --- a/src/mongo/db/repair_database.cpp +++ b/src/mongo/db/repair_database.cpp @@ -213,7 +213,7 @@ Status rebuildIndexesOnCollection(OperationContext* opCtx, } } - Status status = indexer->doneInserting(); + Status status = indexer->dumpInsertsFromBulk(); if (!status.isOK()) return status; diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index 29c1a7ea0e5..4dd31ed31a9 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -165,7 +165,7 @@ Status CollectionBulkLoaderImpl::commit() { // deleted. if (_secondaryIndexesBlock) { std::set<RecordId> secDups; - auto status = _secondaryIndexesBlock->doneInserting(&secDups); + auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(&secDups); if (!status.isOK()) { return status; } @@ -193,8 +193,8 @@ Status CollectionBulkLoaderImpl::commit() { if (_idIndexBlock) { // Delete dups. std::set<RecordId> dups; - // Do not do inside a WriteUnitOfWork (required by doneInserting). - auto status = _idIndexBlock->doneInserting(&dups); + // Do not do inside a WriteUnitOfWork (required by dumpInsertsFromBulk). + auto status = _idIndexBlock->dumpInsertsFromBulk(&dups); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/storage/kv/SConscript b/src/mongo/db/storage/kv/SConscript index 9400873d7cf..f9a6650e90d 100644 --- a/src/mongo/db/storage/kv/SConscript +++ b/src/mongo/db/storage/kv/SConscript @@ -45,7 +45,10 @@ env.Library( # Should not be referenced outside this SConscript file. env.Library( target='kv_storage_engine', - source=['kv_storage_engine.cpp'], + source=[ + 'kv_storage_engine.cpp', + 'temporary_kv_record_store.cpp' + ], LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/catalog/catalog_impl', diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp index 7dc534c2942..2ae61716062 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp @@ -42,6 +42,7 @@ #include "mongo/db/storage/kv/kv_catalog_feature_tracker.h" #include "mongo/db/storage/kv/kv_database_catalog_entry.h" #include "mongo/db/storage/kv/kv_engine.h" +#include "mongo/db/storage/kv/temporary_kv_record_store.h" #include "mongo/db/storage/storage_repair_observer.h" #include "mongo/db/unclean_shutdown.h" #include "mongo/util/assert_util.h" @@ -627,8 +628,12 @@ Status KVStorageEngine::repairRecordStore(OperationContext* opCtx, const std::st return Status::OK(); } -std::unique_ptr<RecordStore> KVStorageEngine::makeTemporaryRecordStore(OperationContext* opCtx) { - return _engine->makeTemporaryRecordStore(opCtx, _catalog->newTempIdent()); +std::unique_ptr<TemporaryRecordStore> KVStorageEngine::makeTemporaryRecordStore( + OperationContext* opCtx) { + std::unique_ptr<RecordStore> rs = + _engine->makeTemporaryRecordStore(opCtx, _catalog->newTempIdent()); + LOG(1) << "created temporary record store: " << rs->getIdent(); + return std::make_unique<TemporaryKVRecordStore>(opCtx, getEngine(), std::move(rs)); } void KVStorageEngine::setJournalListener(JournalListener* jl) { @@ -734,4 +739,5 @@ void KVStorageEngine::_dumpCatalog(OperationContext* opCtx) { opCtx->recoveryUnit()->abandonSnapshot(); } + } // namespace mongo diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h index b148f29b105..eac5e5a1896 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine.h +++ b/src/mongo/db/storage/kv/kv_storage_engine.h @@ -42,6 +42,7 @@ #include "mongo/db/storage/kv/kv_database_catalog_entry_base.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/storage_engine.h" +#include "mongo/db/storage/temporary_record_store.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" @@ -122,7 +123,8 @@ public: virtual Status repairRecordStore(OperationContext* opCtx, const std::string& ns); - virtual std::unique_ptr<RecordStore> makeTemporaryRecordStore(OperationContext* opCtx) override; + virtual std::unique_ptr<TemporaryRecordStore> makeTemporaryRecordStore( + OperationContext* opCtx) override; virtual void cleanShutdown(); diff --git a/src/mongo/db/storage/kv/kv_storage_engine_test.cpp b/src/mongo/db/storage/kv/kv_storage_engine_test.cpp index 68c7098e689..bd58e5adc0a 100644 --- a/src/mongo/db/storage/kv/kv_storage_engine_test.cpp +++ b/src/mongo/db/storage/kv/kv_storage_engine_test.cpp @@ -76,7 +76,7 @@ public: return _storageEngine->getCatalog()->getCollectionIdent(ns.ns()); } - std::unique_ptr<RecordStore> makeTemporary(OperationContext* opCtx) { + std::unique_ptr<TemporaryRecordStore> makeTemporary(OperationContext* opCtx) { return _storageEngine->makeTemporaryRecordStore(opCtx); } @@ -230,8 +230,8 @@ TEST_F(KVStorageEngineTest, ReconcileDropsTemporary) { auto opCtx = cc().makeOperationContext(); auto rs = makeTemporary(opCtx.get()); - ASSERT(rs); - const std::string ident = rs->getIdent(); + ASSERT(rs.get()); + const std::string ident = rs->rs()->getIdent(); ASSERT(identExists(opCtx.get(), ident)); @@ -241,6 +241,22 @@ TEST_F(KVStorageEngineTest, ReconcileDropsTemporary) { ASSERT(!identExists(opCtx.get(), ident)); } +TEST_F(KVStorageEngineTest, TemporaryDropsItself) { + auto opCtx = cc().makeOperationContext(); + + std::string ident; + { + auto rs = makeTemporary(opCtx.get()); + ASSERT(rs.get()); + ident = rs->rs()->getIdent(); + + ASSERT(identExists(opCtx.get(), ident)); + } + + // The temporary record store RAII class should drop itself. + ASSERT(!identExists(opCtx.get(), ident)); +} + TEST_F(KVStorageEngineRepairTest, LoadCatalogRecoversOrphans) { auto opCtx = cc().makeOperationContext(); diff --git a/src/mongo/db/storage/kv/temporary_kv_record_store.cpp b/src/mongo/db/storage/kv/temporary_kv_record_store.cpp new file mode 100644 index 00000000000..3f2a9d47fb7 --- /dev/null +++ b/src/mongo/db/storage/kv/temporary_kv_record_store.cpp @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/db/storage/kv/temporary_kv_record_store.h" +#include "mongo/util/log.h" + +namespace mongo { + +TemporaryKVRecordStore::~TemporaryKVRecordStore() { + auto status = _kvEngine->dropIdent(_opCtx, _rs->getIdent()); + invariant(status.isOK(), + str::stream() << "failed to drop temporary ident: " << _rs->getIdent() + << " with status: " + << status); +} + +} // namespace mongo diff --git a/src/mongo/db/storage/kv/temporary_kv_record_store.h b/src/mongo/db/storage/kv/temporary_kv_record_store.h new file mode 100644 index 00000000000..8fa63e65555 --- /dev/null +++ b/src/mongo/db/storage/kv/temporary_kv_record_store.h @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/storage/temporary_record_store.h" + +#include "mongo/db/operation_context.h" +#include "mongo/db/storage/kv/kv_engine.h" +#include "mongo/db/storage/record_store.h" + +namespace mongo { + +/** + * This is an implementation of an RAII type that manages a temporary RecordStore on a KVEngine. + * + * This object should not exist any longer than the provided OperationContext, as the destructor + * uses it to drop the record store on the KVEngine. + */ +class TemporaryKVRecordStore : public TemporaryRecordStore { +public: + TemporaryKVRecordStore(OperationContext* opCtx, + KVEngine* kvEngine, + std::unique_ptr<RecordStore> rs) + : TemporaryRecordStore(std::move(rs)), _opCtx(opCtx), _kvEngine(kvEngine){}; + + // Move constructor. + TemporaryKVRecordStore(TemporaryKVRecordStore&& other) noexcept + : TemporaryRecordStore(std::move(other._rs)), + _opCtx(other._opCtx), + _kvEngine(other._kvEngine) {} + + ~TemporaryKVRecordStore(); + +private: + OperationContext* _opCtx; + KVEngine* _kvEngine; +}; + +} // namespace mongo diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index a6dc2901ff1..e2d659e840a 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -267,6 +267,10 @@ public: _ns = ns.ns(); } + bool isTemp() const { + return ns().size() == 0; + } + virtual const std::string& getIdent() const = 0; /** diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h index 456972b0997..d8926a68f50 100644 --- a/src/mongo/db/storage/storage_engine.h +++ b/src/mongo/db/storage/storage_engine.h @@ -45,7 +45,7 @@ namespace mongo { class DatabaseCatalogEntry; class JournalListener; class OperationContext; -class RecordStore; +class TemporaryRecordStore; class RecoveryUnit; class SnapshotManager; struct StorageGlobalParams; @@ -288,11 +288,13 @@ public: virtual Status repairRecordStore(OperationContext* opCtx, const std::string& ns) = 0; /** - * Creates a temporary RecordStore on the storage engine. This record store should be dropped by - * the caller when done being used. The storage engine will drop any remaining temporary record - * stores on startup. + * Creates a temporary RecordStore on the storage engine. This record store will drop itself + * automatically when it goes out of scope. This means the TemporaryRecordStore should not exist + * any longer than the OperationContext used to create it. On startup, the storage engine will + * drop any un-dropped temporary record stores. */ - virtual std::unique_ptr<RecordStore> makeTemporaryRecordStore(OperationContext* opCtx) = 0; + virtual std::unique_ptr<TemporaryRecordStore> makeTemporaryRecordStore( + OperationContext* opCtx) = 0; /** * This method will be called before there is a clean shutdown. Storage engines should diff --git a/src/mongo/db/storage/temporary_record_store.h b/src/mongo/db/storage/temporary_record_store.h new file mode 100644 index 00000000000..ea42f9d73be --- /dev/null +++ b/src/mongo/db/storage/temporary_record_store.h @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/storage/record_store.h" + +namespace mongo { + +/** + * This is an RAII type that manages the lifetime of a temporary RecordStore. + * + * Derived classes must implement a destructor that drops the underlying RecordStore from the + * storage engine. + */ +class TemporaryRecordStore { + MONGO_DISALLOW_COPYING(TemporaryRecordStore); + +public: + TemporaryRecordStore(std::unique_ptr<RecordStore> rs) : _rs(std::move(rs)) {} + + // Move constructor. + TemporaryRecordStore(TemporaryRecordStore&& other) noexcept : _rs(std::move(other._rs)) {} + + virtual ~TemporaryRecordStore() {} + + RecordStore* rs() { + return _rs.get(); + } + + const RecordStore* rs() const { + return _rs.get(); + } + +protected: + std::unique_ptr<RecordStore> _rs; +}; +} // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp index 0ee4e2c2781..7923d2d90ed 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp @@ -427,6 +427,8 @@ stdx::function<bool(StringData)> initRsOplogBackgroundThreadCallback = [](String }; } // namespace +StringData WiredTigerKVEngine::kTableUriPrefix = "table:"_sd; + WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, const std::string& path, ClockSource* cs, @@ -794,10 +796,9 @@ Status WiredTigerKVEngine::_salvageIfNeeded(const char* uri) { Status WiredTigerKVEngine::_rebuildIdent(WT_SESSION* session, const char* uri) { invariant(_inRepairMode); - static const char tablePrefix[] = "table:"; - invariant(std::string(uri).find(tablePrefix) == 0); + invariant(std::string(uri).find(kTableUriPrefix.rawData()) == 0); - const std::string identName(uri + sizeof(tablePrefix) - 1); + const std::string identName(uri + kTableUriPrefix.size()); auto filePath = getDataFilePathForIdent(identName); if (filePath) { const boost::filesystem::path corruptFile(filePath->string() + ".corrupt"); @@ -1042,7 +1043,7 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::getGroupedRecordStore( WiredTigerRecordStore::Params params; params.ns = ns; - params.uri = _uri(ident); + params.ident = ident.toString(); params.engineName = _canonicalName; params.isCapped = options.capped; params.isEphemeral = _ephemeral; @@ -1074,7 +1075,8 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::getGroupedRecordStore( } string WiredTigerKVEngine::_uri(StringData ident) const { - return string("table:") + ident.toString(); + invariant(ident.find(kTableUriPrefix) == string::npos); + return kTableUriPrefix + ident.toString(); } Status WiredTigerKVEngine::createGroupedSortedDataInterface(OperationContext* opCtx, @@ -1146,7 +1148,7 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::makeTemporaryRecordStore(Operat WiredTigerRecordStore::Params params; params.ns = ""; - params.uri = _uri(ident); + params.ident = ident.toString(); params.engineName = _canonicalName; params.isCapped = false; params.isEphemeral = _ephemeral; @@ -1190,7 +1192,7 @@ Status WiredTigerKVEngine::dropIdent(OperationContext* opCtx, StringData ident) int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force,checkpoint_wait=false"); - LOG(1) << "WT drop of " << uri << " res " << ret; + LOG(1) << "WT drop of " << uri << " res " << ret; if (ret == 0) { // yay, it worked diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h index 95f9cda47aa..6836d9868fa 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h @@ -68,6 +68,8 @@ struct WiredTigerFileVersion { class WiredTigerKVEngine final : public KVEngine { public: static const int kDefaultJournalDelayMillis; + static StringData kTableUriPrefix; + WiredTigerKVEngine(const std::string& canonicalName, const std::string& path, ClockSource* cs, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp index eb5077d48f8..37f624826ea 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp @@ -99,7 +99,8 @@ public: WiredTigerRecoveryUnit* ru = checked_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit()); OperationContextNoop opCtx(ru); - string uri = "table:" + ns; + string ident = ns; + string uri = WiredTigerKVEngine::kTableUriPrefix + ns; const bool prefixed = true; StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString( @@ -116,7 +117,7 @@ public: WiredTigerRecordStore::Params params; params.ns = ns; - params.uri = uri; + params.ident = ident; params.engineName = kWiredTigerEngineName; params.isCapped = false; params.isEphemeral = false; @@ -142,7 +143,8 @@ public: WiredTigerRecoveryUnit* ru = checked_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit()); OperationContextNoop opCtx(ru); - string uri = "table:a.b"; + string ident = "a.b"; + string uri = WiredTigerKVEngine::kTableUriPrefix + ident; CollectionOptions options; options.capped = true; @@ -162,7 +164,7 @@ public: WiredTigerRecordStore::Params params; params.ns = ns; - params.uri = uri; + params.ident = ident; params.engineName = kWiredTigerEngineName; params.isCapped = true; params.isEphemeral = false; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 135d904f5ce..0d7c82d0b63 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -638,7 +638,8 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, OperationContext* ctx, Params params) : RecordStore(params.ns), - _uri(params.uri), + _uri(WiredTigerKVEngine::kTableUriPrefix + params.ident), + _ident(params.ident), _tableId(WiredTigerSession::genTableId()), _engineName(params.engineName), _isCapped(params.isCapped), @@ -654,9 +655,12 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, _cappedDeleteCheckCount(0), _sizeStorer(params.sizeStorer), _kvEngine(kvEngine) { + invariant(_ident.size() > 0); + Status versionStatus = WiredTigerUtil::checkApplicationMetadataFormatVersion( ctx, _uri, kMinimumRecordStoreVersion, kMaximumRecordStoreVersion) .getStatus(); + if (!versionStatus.isOK()) { std::cout << " Version: " << versionStatus.reason() << std::endl; if (versionStatus.code() == ErrorCodes::FailedToParse) { @@ -674,7 +678,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, invariant(_cappedMaxDocs == -1); } - if (!params.isReadOnly) { + if (!params.isReadOnly && !isTemp()) { bool replicatedWrites = getGlobalReplSettings().usingReplSets() || repl::ReplSettings::shouldRecoverFromOplogAsStandalone(); uassertStatusOK(WiredTigerUtil::setTableLogging( @@ -686,7 +690,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine, // The oplog always needs to be marked for size adjustment since it is journaled and also // may change during replication recovery (if truncated). sizeRecoveryState(getGlobalServiceContext()) - .markCollectionAsAlwaysNeedsSizeAdjustment(_uri); + .markCollectionAsAlwaysNeedsSizeAdjustment(_ident); } } @@ -696,7 +700,11 @@ WiredTigerRecordStore::~WiredTigerRecordStore() { _shuttingDown = true; } - LOG(1) << "~WiredTigerRecordStore for: " << ns(); + if (!isTemp()) { + LOG(1) << "~WiredTigerRecordStore for: " << ns(); + } else { + LOG(1) << "~WiredTigerRecordStore for temporary ident: " << getIdent(); + } if (_oplogStones) { _oplogStones->kill(); @@ -744,9 +752,9 @@ void WiredTigerRecordStore::postConstructorInit(OperationContext* opCtx) { // time but not as of the top of the oplog. LOG_FOR_RECOVERY(2) << "Record store was empty; setting count metadata to zero but marking " "record store as needing size adjustment during recovery. ns: " - << ns() << ", ident: " << _uri; + << (isTemp() ? "(temp)" : ns()) << ", ident: " << _ident; sizeRecoveryState(getGlobalServiceContext()) - .markCollectionAsAlwaysNeedsSizeAdjustment(_uri); + .markCollectionAsAlwaysNeedsSizeAdjustment(_ident); _sizeInfo->dataSize.store(0); _sizeInfo->numRecords.store(0); @@ -907,7 +915,7 @@ int64_t WiredTigerRecordStore::_cappedDeleteAsNeeded(OperationContext* opCtx, // replication recovery. If we don't mark the collection for size adjustment then we will not // perform the capped deletions as expected. In that case, the collection is guaranteed to be // empty at the stable timestamp and thus guaranteed to be marked for size adjustment. - if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_uri)) { + if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_ident)) { return 0; } @@ -1653,7 +1661,7 @@ void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* opCtx, long long numRecords, long long dataSize) { // We're correcting the size as of now, future writes should be tracked. - sizeRecoveryState(getGlobalServiceContext()).markCollectionAsAlwaysNeedsSizeAdjustment(_uri); + sizeRecoveryState(getGlobalServiceContext()).markCollectionAsAlwaysNeedsSizeAdjustment(_ident); _sizeInfo->numRecords.store(numRecords); _sizeInfo->dataSize.store(dataSize); @@ -1689,7 +1697,7 @@ private: }; void WiredTigerRecordStore::_changeNumRecords(OperationContext* opCtx, int64_t diff) { - if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_uri)) { + if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_ident)) { return; } @@ -1712,7 +1720,7 @@ private: }; void WiredTigerRecordStore::_increaseDataSize(OperationContext* opCtx, int64_t amount) { - if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_uri)) { + if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_ident)) { return; } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 019d7e2331c..9cc17eb0b00 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -106,7 +106,7 @@ public: struct Params { StringData ns; - std::string uri; + std::string ident; std::string engineName; bool isCapped; bool isEphemeral; @@ -233,7 +233,7 @@ public: } const std::string& getIdent() const override { - return _uri; + return _ident; } uint64_t tableId() const { @@ -334,6 +334,7 @@ private: int64_t _cappedDeleteAsNeeded_inlock(OperationContext* opCtx, const RecordId& justInserted); const std::string _uri; + const std::string _ident; const uint64_t _tableId; // not persisted // Canonical engine name to use for retrieving options diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp index cb886f7f1ab..719b2f3bbce 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp @@ -74,7 +74,8 @@ public: virtual std::unique_ptr<RecordStore> createRecordStore(OperationContext* opCtx, const std::string& ns) final { - std::string uri = "table:" + ns; + std::string ident = ns; + std::string uri = WiredTigerKVEngine::kTableUriPrefix + ns; const bool prefixed = false; StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString( kWiredTigerEngineName, ns, CollectionOptions(), "", prefixed); @@ -92,7 +93,7 @@ public: WiredTigerRecordStore::Params params; params.ns = ns; - params.uri = uri; + params.ident = ident; params.engineName = kWiredTigerEngineName; params.isCapped = false; params.isEphemeral = false; @@ -536,7 +537,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, ReadOnceCursorsAreNotCached) { auto ru = WiredTigerRecoveryUnit::get(opCtx); std::unique_ptr<RecordStore> rs(harnessHelper->createRecordStore(opCtx, "test.read_once")); - auto uri = rs->getIdent(); + auto uri = dynamic_cast<WiredTigerRecordStore*>(rs.get())->getURI(); // Insert a record. ru->beginUnitOfWork(opCtx); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp index e8c4298f66b..fc2caf37290 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp @@ -97,9 +97,9 @@ public: virtual std::unique_ptr<RecordStore> newNonCappedRecordStore(const std::string& ns) { WiredTigerRecoveryUnit* ru = - dynamic_cast<WiredTigerRecoveryUnit*>(_engine.newRecoveryUnit()); + checked_cast<WiredTigerRecoveryUnit*>(_engine.newRecoveryUnit()); OperationContextNoop opCtx(ru); - string uri = "table:" + ns; + string uri = WiredTigerKVEngine::kTableUriPrefix + ns; const bool prefixed = false; StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString( @@ -116,7 +116,7 @@ public: WiredTigerRecordStore::Params params; params.ns = ns; - params.uri = uri; + params.ident = ns; params.engineName = kWiredTigerEngineName; params.isCapped = false; params.isEphemeral = false; @@ -141,7 +141,8 @@ public: WiredTigerRecoveryUnit* ru = dynamic_cast<WiredTigerRecoveryUnit*>(_engine.newRecoveryUnit()); OperationContextNoop opCtx(ru); - string uri = "table:a.b"; + string ident = "a.b"; + string uri = WiredTigerKVEngine::kTableUriPrefix + "a.b"; CollectionOptions options; options.capped = true; @@ -161,7 +162,7 @@ public: WiredTigerRecordStore::Params params; params.ns = ns; - params.uri = uri; + params.ident = ident; params.engineName = kWiredTigerEngineName; params.isCapped = true; params.isEphemeral = false; @@ -215,9 +216,10 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) { unique_ptr<WiredTigerHarnessHelper> harnessHelper(new WiredTigerHarnessHelper()); unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore()); + string ident = rs->getIdent(); string uri = checked_cast<WiredTigerRecordStore*>(rs.get())->getURI(); - string indexUri = "table:myindex"; + string indexUri = WiredTigerKVEngine::kTableUriPrefix + "myindex"; const bool enableWtLogging = false; WiredTigerSizeStorer ss(harnessHelper->conn(), indexUri, enableWtLogging); checked_cast<WiredTigerRecordStore*>(rs.get())->setSizeStorer(&ss); @@ -252,7 +254,7 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) { ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext()); WiredTigerRecordStore::Params params; params.ns = "a.b"_sd; - params.uri = uri; + params.ident = ident; params.engineName = kWiredTigerEngineName; params.isCapped = false; params.isEphemeral = false; @@ -318,10 +320,13 @@ private: harnessHelper.reset(new WiredTigerHarnessHelper()); const bool enableWtLogging = false; sizeStorer.reset( - new WiredTigerSizeStorer(harnessHelper->conn(), "table:sizeStorer", enableWtLogging)); + new WiredTigerSizeStorer(harnessHelper->conn(), + WiredTigerKVEngine::kTableUriPrefix + "sizeStorer", + enableWtLogging)); rs = harnessHelper->newNonCappedRecordStore(); WiredTigerRecordStore* wtrs = checked_cast<WiredTigerRecordStore*>(rs.get()); wtrs->setSizeStorer(sizeStorer.get()); + ident = wtrs->getIdent(); uri = wtrs->getURI(); expectedNumRecords = 100; @@ -361,6 +366,7 @@ protected: std::unique_ptr<WiredTigerHarnessHelper> harnessHelper; std::unique_ptr<WiredTigerSizeStorer> sizeStorer; std::unique_ptr<RecordStore> rs; + std::string ident; std::string uri; long long expectedNumRecords; @@ -418,7 +424,7 @@ TEST_F(SizeStorerValidateTest, InvalidSizeStorerAtCreation) { WiredTigerRecordStore::Params params; params.ns = "a.b"_sd; - params.uri = uri; + params.ident = ident; params.engineName = kWiredTigerEngineName; params.isCapped = false; params.isEphemeral = false; |