diff options
Diffstat (limited to 'src/mongo/db/index/columns_access_method.cpp')
-rw-r--r-- | src/mongo/db/index/columns_access_method.cpp | 241 |
1 files changed, 190 insertions, 51 deletions
diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp index ef03980b917..55e6c9ad03a 100644 --- a/src/mongo/db/index/columns_access_method.cpp +++ b/src/mongo/db/index/columns_access_method.cpp @@ -41,7 +41,9 @@ #include "mongo/db/index/column_cell.h" #include "mongo/db/index/column_key_generator.h" #include "mongo/db/index/column_store_sorter.h" +#include "mongo/db/index/index_build_interceptor.h" #include "mongo/db/index/index_descriptor.h" +#include "mongo/db/storage/execution_context.h" #include "mongo/logv2/log.h" #include "mongo/util/progress_meter.h" @@ -54,6 +56,11 @@ inline void inc(int64_t* counter) { if (counter) ++*counter; }; + +inline void dec(int64_t* counter) { + if (counter) + --*counter; +}; } // namespace ColumnStoreAccessMethod::ColumnStoreAccessMethod(IndexCatalogEntry* ice, @@ -260,6 +267,26 @@ Status ColumnStoreAccessMethod::BulkBuilder::keyCommitted( return Status::OK(); } + +void ColumnStoreAccessMethod::_visitCellsForIndexInsert( + OperationContext* opCtx, + PooledFragmentBuilder& buf, + const std::vector<BsonRecord>& bsonRecords, + function_ref<void(StringData, const BsonRecord&)> cb) const { + _keyGen.visitCellsForInsert( + bsonRecords, + [&](StringData path, const BsonRecord& rec, const column_keygen::UnencodedCellView& cell) { + if (!rec.ts.isNull()) { + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts)); + } + buf.reset(); + column_keygen::writeEncodedCell(cell, &buf); + tassert( + 6597800, "RecordID cannot be a string for column store indexes", !rec.id.isStr()); + cb(path, rec); + }); +} + Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, SharedBufferFragmentBuilder& pooledBufferBuilder, const CollectionPtr& coll, @@ -268,24 +295,33 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, int64_t* keysInsertedOut) { try { PooledFragmentBuilder buf(pooledBufferBuilder); - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitCellsForInsert( - bsonRecords, - [&](StringData path, - const BsonRecord& rec, - const column_keygen::UnencodedCellView& cell) { - if (!rec.ts.isNull()) { - uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts)); + // We cannot write to the index during its initial build phase, so we defer this insert as a + // "side write" to be applied after the build completes. + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _visitCellsForIndexInsert( + opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) { + columnKeys->emplace_back( + path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rec.id); + }); + int64_t inserted = 0; + ON_BLOCK_EXIT([keysInsertedOut, inserted] { + if (keysInsertedOut) { + *keysInsertedOut += inserted; } - - buf.reset(); - column_keygen::writeEncodedCell(cell, &buf); - invariant(!rec.id.isStr()); - cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())}); - - inc(keysInsertedOut); }); - return Status::OK(); + uassertStatusOK(_indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kInsert, &inserted)); + return Status::OK(); + } else { + auto cursor = _store->newWriteCursor(opCtx); + _visitCellsForIndexInsert( + opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) { + cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())}); + inc(keysInsertedOut); + }); + return Status::OK(); + } } catch (const AssertionException& ex) { return ex.toStatus(); } @@ -300,12 +336,26 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx, const InsertDeleteOptions& options, int64_t* keysDeletedOut, CheckRecordId checkRecordId) { - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitPathsForDelete(obj, [&](StringData path) { - tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr()); - cursor->remove(path, rid.getLong()); - inc(keysDeletedOut); - }); + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _keyGen.visitPathsForDelete(obj, [&](StringData path) { + columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid)); + }); + int64_t removed = 0; + fassert(6597801, + _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed)); + if (keysDeletedOut) { + *keysDeletedOut += removed; + } + } else { + auto cursor = _store->newWriteCursor(opCtx); + _keyGen.visitPathsForDelete(obj, [&](PathView path) { + tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr()); + cursor->remove(path, rid.getLong()); + inc(keysDeletedOut); + }); + } } Status ColumnStoreAccessMethod::update(OperationContext* opCtx, @@ -318,37 +368,88 @@ Status ColumnStoreAccessMethod::update(OperationContext* opCtx, int64_t* keysInsertedOut, int64_t* keysDeletedOut) { PooledFragmentBuilder buf(pooledBufferBuilder); - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitDiffForUpdate( - oldDoc, - newDoc, - [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, - StringData path, - const column_keygen::UnencodedCellView* cell) { - if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { - tassert( - 6762302, "RecordID cannot be a string for column store indexes", !rid.isStr()); - cursor->remove(path, rid.getLong()); - inc(keysDeletedOut); - return; - } - // kInsert and kUpdate are handled almost identically. If we switch to using - // `overwrite=true` cursors in WT, we could consider making them the same, although that - // might disadvantage other implementations of the storage engine API. - buf.reset(); - column_keygen::writeEncodedCell(*cell, &buf); + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _keyGen.visitDiffForUpdate( + oldDoc, + newDoc, + [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, + StringData path, + const column_keygen::UnencodedCellView* cell) { + if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { + columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid)); + int64_t removed = 0; + fassert(6597802, + _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed)); + + if (keysDeletedOut) { + *keysDeletedOut += removed; + } + return; + } + + // kInsert and kUpdate are handled almost identically. If we switch to using + // `overwrite=true` cursors in WT, we could consider making them the same, + // although that might disadvantage other implementations of the storage engine + // API. + buf.reset(); + column_keygen::writeEncodedCell(*cell, &buf); - const auto method = diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert - ? &ColumnStore::WriteCursor::insert - : &ColumnStore::WriteCursor::update; - tassert(6762303, "RecordID cannot be a string for column store indexes", !rid.isStr()); - (cursor.get()->*method)(path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())}); + const auto method = + diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert + ? IndexBuildInterceptor::Op::kInsert + : IndexBuildInterceptor::Op::kUpdate; - inc(keysInsertedOut); - }); + columnKeys->emplace_back(std::make_tuple( + path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rid)); + + int64_t inserted = 0; + Status status = _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, method, &inserted); + if (keysInsertedOut) { + *keysInsertedOut += inserted; + } + }); + + } else { + auto cursor = _store->newWriteCursor(opCtx); + _keyGen.visitDiffForUpdate( + oldDoc, + newDoc, + [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, + StringData path, + const column_keygen::UnencodedCellView* cell) { + if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { + tassert(6762302, + "RecordID cannot be a string for column store indexes", + !rid.isStr()); + cursor->remove(path, rid.getLong()); + inc(keysDeletedOut); + return; + } + + // kInsert and kUpdate are handled almost identically. If we switch to using + // `overwrite=true` cursors in WT, we could consider making them the same, although + // that might disadvantage other implementations of the storage engine API. + buf.reset(); + column_keygen::writeEncodedCell(*cell, &buf); + + const auto method = + diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert + ? &ColumnStore::WriteCursor::insert + : &ColumnStore::WriteCursor::update; + tassert( + 6762303, "RecordID cannot be a string for column store indexes", !rid.isStr()); + (cursor.get()->*method)( + path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())}); + + inc(keysInsertedOut); + }); + } return Status::OK(); -} +} // namespace mongo Status ColumnStoreAccessMethod::initializeAsEmpty(OperationContext* opCtx) { return Status::OK(); @@ -383,8 +484,9 @@ std::unique_ptr<IndexAccessMethod::BulkBuilder> ColumnStoreAccessMethod::initiat size_t maxMemoryUsageBytes, const boost::optional<IndexStateInfo>& stateInfo, StringData dbName) { - return stateInfo ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName) - : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName); + return (stateInfo && stateInfo->getFileName()) + ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName) + : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName); } std::shared_ptr<Ident> ColumnStoreAccessMethod::getSharedIdent() const { @@ -395,4 +497,41 @@ void ColumnStoreAccessMethod::setIdent(std::shared_ptr<Ident> ident) { _store->setIdent(std::move(ident)); } +void ColumnStoreAccessMethod::applyColumnDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + int64_t* keysInserted, + int64_t* keysDeleted) { + const IndexBuildInterceptor::Op opType = operation.getStringField("op") == "i"_sd + ? IndexBuildInterceptor::Op::kInsert + : operation.getStringField("op") == "d"_sd ? IndexBuildInterceptor::Op::kDelete + : IndexBuildInterceptor::Op::kUpdate; + + RecordId rid = RecordId::deserializeToken(operation.getField("rid")); + + CellView cell = operation.getStringField("cell"); + PathView path = operation.getStringField("path"); + + auto cursor = _store->newWriteCursor(opCtx); + + tassert(6597803, "RecordID cannot be a string for column store indexes", !rid.isStr()); + switch (opType) { + case IndexBuildInterceptor::Op::kInsert: + cursor->insert(path, rid.getLong(), cell); + inc(keysInserted); + opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); }); + break; + case IndexBuildInterceptor::Op::kDelete: + cursor->remove(path, rid.getLong()); + inc(keysDeleted); + opCtx->recoveryUnit()->onRollback([keysDeleted] { dec(keysDeleted); }); + break; + case IndexBuildInterceptor::Op::kUpdate: + cursor->update(path, rid.getLong(), cell); + inc(keysInserted); + opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); }); + break; + } +} + } // namespace mongo |