/** * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/index/columns_access_method.h" #include #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/curop.h" #include "mongo/db/index/bulk_builder_common.h" #include "mongo/db/index/column_cell.h" #include "mongo/db/index/column_key_generator.h" #include "mongo/db/index/column_store_sorter.h" #include "mongo/db/index/index_build_interceptor.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/storage/execution_context.h" #include "mongo/logv2/log.h" #include "mongo/util/progress_meter.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kIndex namespace mongo { namespace { inline void inc(int64_t* counter) { if (counter) ++*counter; }; inline void dec(int64_t* counter) { if (counter) --*counter; }; } // namespace ColumnStoreAccessMethod::ColumnStoreAccessMethod(IndexCatalogEntry* ice, std::unique_ptr store) : _store(std::move(store)), _indexCatalogEntry(ice), _descriptor(ice->descriptor()), _keyGen(_descriptor->keyPattern(), _descriptor->pathProjection()) {} class ColumnStoreAccessMethod::BulkBuilder final : public BulkBuilderCommon { public: BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, StringData dbName); BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, const IndexStateInfo& stateInfo, StringData dbName); // // Generic APIs // Status insert(OperationContext* opCtx, const CollectionPtr& collection, const BSONObj& obj, const RecordId& rid, const InsertDeleteOptions& options, const OnSuppressedErrorFn& onSuppressedError, const ShouldRelaxConstraintsFn& shouldRelaxConstraints) final; const MultikeyPaths& getMultikeyPaths() const final; bool isMultikey() const final; int64_t getKeysInserted() const; IndexStateInfo persistDataForShutdown() final; std::unique_ptr finalizeSort(); std::unique_ptr setUpBulkInserter(OperationContext* opCtx, bool dupsAllowed); void debugEnsureSorted(const std::pair& data); bool duplicateCheck(OperationContext* opCtx, const std::pair& data, bool dupsAllowed, const RecordIdHandlerFn& onDuplicateRecord); void insertKey(std::unique_ptr& inserter, const std::pair& data); Status keyCommitted(const KeyHandlerFn& onDuplicateKeyInserted, const std::pair& data, bool isDup); private: ColumnStoreAccessMethod* const _columnsAccess; ColumnStoreSorter _sorter; BufBuilder _cellBuilder; boost::optional> _previousPathAndRowId; }; ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, StringData dbName) : BulkBuilderCommon(0, "Index Build: inserting keys from external sorter into columnstore index", index->_descriptor->indexName()), _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats(), bulkBuilderTracker()) { countNewBuildInStats(); } ColumnStoreAccessMethod::BulkBuilder::BulkBuilder(ColumnStoreAccessMethod* index, size_t maxMemoryUsageBytes, const IndexStateInfo& stateInfo, StringData dbName) : BulkBuilderCommon(stateInfo.getNumKeys().value_or(0), "Index Build: inserting keys from external sorter into columnstore index", index->_descriptor->indexName()), _columnsAccess(index), _sorter(maxMemoryUsageBytes, dbName, bulkBuilderFileStats(), stateInfo.getFileName()->toString(), *stateInfo.getRanges(), bulkBuilderTracker()) { countResumedBuildInStats(); } Status ColumnStoreAccessMethod::BulkBuilder::insert( OperationContext* opCtx, const CollectionPtr& collection, const BSONObj& obj, const RecordId& rid, const InsertDeleteOptions& options, const OnSuppressedErrorFn& onSuppressedError, const ShouldRelaxConstraintsFn& shouldRelaxConstraints) { _columnsAccess->_keyGen.visitCellsForInsert( obj, [&](PathView path, const column_keygen::UnencodedCellView& cell) { _cellBuilder.reset(); column_keygen::writeEncodedCell(cell, &_cellBuilder); tassert(6762300, "RecordID cannot be a string for column store indexes", !rid.isStr()); _sorter.add(path, rid.getLong(), CellView(_cellBuilder.buf(), _cellBuilder.len())); ++_keysInserted; }); return Status::OK(); } // The "multikey" property does not apply to columnstore indexes, because the array key does not // represent a field in a document and const MultikeyPaths& ColumnStoreAccessMethod::BulkBuilder::getMultikeyPaths() const { const static MultikeyPaths empty; return empty; } bool ColumnStoreAccessMethod::BulkBuilder::isMultikey() const { return false; } int64_t ColumnStoreAccessMethod::BulkBuilder::getKeysInserted() const { return _keysInserted; } IndexStateInfo ColumnStoreAccessMethod::BulkBuilder::persistDataForShutdown() { auto state = _sorter.persistDataForShutdown(); IndexStateInfo stateInfo; stateInfo.setFileName(StringData(state.fileName)); stateInfo.setNumKeys(_keysInserted); stateInfo.setRanges(std::move(state.ranges)); return stateInfo; } std::unique_ptr ColumnStoreAccessMethod::BulkBuilder::finalizeSort() { return std::unique_ptr(_sorter.done()); } std::unique_ptr ColumnStoreAccessMethod::BulkBuilder::setUpBulkInserter( OperationContext* opCtx, bool dupsAllowed) { _ns = _columnsAccess->_indexCatalogEntry->getNSSFromCatalog(opCtx); return _columnsAccess->_store->makeBulkBuilder(opCtx); } void ColumnStoreAccessMethod::BulkBuilder::debugEnsureSorted( const std::pair& data) { // In debug mode only, assert that keys are retrieved from the sorter in strictly // increasing order. const auto& key = data.first; if (_previousPathAndRowId && !(ColumnStoreSorter::Key{_previousPathAndRowId->first, _previousPathAndRowId->second} < key)) { LOGV2_FATAL_NOTRACE(6548100, "Out-of-order result from sorter for column store bulk loader", "prevPathName"_attr = _previousPathAndRowId->first, "prevRecordId"_attr = _previousPathAndRowId->second, "nextPathName"_attr = key.path, "nextRecordId"_attr = key.rowId, "index"_attr = _indexName); } // It is not safe to safe to directly store the 'key' object, because it includes a // PathView, which may be invalid the next time we read it. _previousPathAndRowId.emplace(key.path, key.rowId); } bool ColumnStoreAccessMethod::BulkBuilder::duplicateCheck( OperationContext* opCtx, const std::pair& data, bool dupsAllowed, const RecordIdHandlerFn& onDuplicateRecord) { // no duplicates in a columnstore index return false; } void ColumnStoreAccessMethod::BulkBuilder::insertKey( std::unique_ptr& inserter, const std::pair& data) { auto& [columnStoreKey, columnStoreValue] = data; inserter->addCell(columnStoreKey.path, columnStoreKey.rowId, columnStoreValue.cell); } Status ColumnStoreAccessMethod::BulkBuilder::keyCommitted( const KeyHandlerFn& onDuplicateKeyInserted, const std::pair& data, bool isDup) { // nothing to do for columnstore indexes return Status::OK(); } void ColumnStoreAccessMethod::_visitCellsForIndexInsert( OperationContext* opCtx, PooledFragmentBuilder& buf, const std::vector& bsonRecords, function_ref cb) const { _keyGen.visitCellsForInsert( bsonRecords, [&](StringData path, const BsonRecord& rec, const column_keygen::UnencodedCellView& cell) { if (!rec.ts.isNull()) { uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts)); } buf.reset(); column_keygen::writeEncodedCell(cell, &buf); tassert( 6597800, "RecordID cannot be a string for column store indexes", !rec.id.isStr()); cb(path, rec); }); } Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, SharedBufferFragmentBuilder& pooledBufferBuilder, const CollectionPtr& coll, const std::vector& bsonRecords, const InsertDeleteOptions& options, int64_t* keysInsertedOut) { try { PooledFragmentBuilder buf(pooledBufferBuilder); // We cannot write to the index during its initial build phase, so we defer this insert as a // "side write" to be applied after the build completes. if (_indexCatalogEntry->isHybridBuilding()) { auto columnChanges = StorageExecutionContext::get(opCtx).columnChanges(); _visitCellsForIndexInsert( opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) { columnChanges->emplace_back( path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rec.id, column_keygen::ColumnKeyGenerator::DiffAction::kInsert); }); int64_t inserted = 0; int64_t deleted = 0; ON_BLOCK_EXIT([keysInsertedOut, inserted, deleted] { if (keysInsertedOut) { *keysInsertedOut += inserted; } invariant(deleted == 0); }); uassertStatusOK(_indexCatalogEntry->indexBuildInterceptor()->sideWrite( opCtx, *columnChanges, &inserted, &deleted)); return Status::OK(); } else { auto cursor = _store->newWriteCursor(opCtx); _visitCellsForIndexInsert( opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) { cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())}); inc(keysInsertedOut); }); return Status::OK(); } } catch (const AssertionException& ex) { return ex.toStatus(); } } void ColumnStoreAccessMethod::remove(OperationContext* opCtx, SharedBufferFragmentBuilder& pooledBufferBuilder, const CollectionPtr& coll, const BSONObj& obj, const RecordId& rid, bool logIfError, const InsertDeleteOptions& options, int64_t* keysDeletedOut, CheckRecordId checkRecordId) { if (_indexCatalogEntry->isHybridBuilding()) { auto columnChanges = StorageExecutionContext::get(opCtx).columnChanges(); _keyGen.visitPathsForDelete(obj, [&](StringData path) { columnChanges->emplace_back(path.toString(), "", // No cell content is necessary to describe a deletion. rid, column_keygen::ColumnKeyGenerator::DiffAction::kDelete); }); int64_t inserted = 0; int64_t removed = 0; fassert(6597801, _indexCatalogEntry->indexBuildInterceptor()->sideWrite( opCtx, *columnChanges, &inserted, &removed)); if (keysDeletedOut) { *keysDeletedOut += removed; } invariant(inserted == 0); } else { auto cursor = _store->newWriteCursor(opCtx); _keyGen.visitPathsForDelete(obj, [&](PathView path) { tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr()); cursor->remove(path, rid.getLong()); inc(keysDeletedOut); }); } } Status ColumnStoreAccessMethod::update(OperationContext* opCtx, SharedBufferFragmentBuilder& pooledBufferBuilder, const BSONObj& oldDoc, const BSONObj& newDoc, const RecordId& rid, const CollectionPtr& coll, const InsertDeleteOptions& options, int64_t* keysInsertedOut, int64_t* keysDeletedOut) { PooledFragmentBuilder buf(pooledBufferBuilder); if (_indexCatalogEntry->isHybridBuilding()) { auto columnChanges = StorageExecutionContext::get(opCtx).columnChanges(); _keyGen.visitDiffForUpdate( oldDoc, newDoc, [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, StringData path, const column_keygen::UnencodedCellView* cell) { if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { columnChanges->emplace_back( path.toString(), "", // No cell content is necessary to describe a deletion. rid, diffAction); return; } // kInsert and kUpdate are handled almost identically. If we switch to using // `overwrite=true` cursors in WT, we could consider making them the same, // although that might disadvantage other implementations of the storage engine // API. buf.reset(); column_keygen::writeEncodedCell(*cell, &buf); columnChanges->emplace_back(path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rid, diffAction); }); // Create a "side write" that records the changes made to this document during the bulk // build, so that they can be applied when the bulk builder finishes. It is possible that an // update does not result in any changes when there is a "columnstoreProjection" on the // index that excludes all the changed fields. int64_t inserted = 0; int64_t deleted = 0; if (columnChanges->size() > 0) { uassertStatusOK(_indexCatalogEntry->indexBuildInterceptor()->sideWrite( opCtx, *columnChanges, &inserted, &deleted)); } if (keysInsertedOut) { *keysInsertedOut += inserted; } if (keysDeletedOut) { *keysDeletedOut += deleted; } } else { auto cursor = _store->newWriteCursor(opCtx); _keyGen.visitDiffForUpdate( oldDoc, newDoc, [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, StringData path, const column_keygen::UnencodedCellView* cell) { if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { tassert(6762302, "RecordID cannot be a string for column store indexes", !rid.isStr()); cursor->remove(path, rid.getLong()); inc(keysDeletedOut); return; } // kInsert and kUpdate are handled almost identically. If we switch to using // `overwrite=true` cursors in WT, we could consider making them the same, although // that might disadvantage other implementations of the storage engine API. buf.reset(); column_keygen::writeEncodedCell(*cell, &buf); const auto method = diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert ? &ColumnStore::WriteCursor::insert : &ColumnStore::WriteCursor::update; tassert( 6762303, "RecordID cannot be a string for column store indexes", !rid.isStr()); (cursor.get()->*method)( path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())}); inc(keysInsertedOut); }); } return Status::OK(); } // namespace mongo Status ColumnStoreAccessMethod::initializeAsEmpty(OperationContext* opCtx) { return Status::OK(); } IndexValidateResults ColumnStoreAccessMethod::validate(OperationContext* opCtx, bool full) const { return _store->validate(opCtx, full); } int64_t ColumnStoreAccessMethod::numKeys(OperationContext* opCtx) const { return _store->numEntries(opCtx); } bool ColumnStoreAccessMethod::appendCustomStats(OperationContext* opCtx, BSONObjBuilder* result, double scale) const { return _store->appendCustomStats(opCtx, result, scale); } long long ColumnStoreAccessMethod::getSpaceUsedBytes(OperationContext* opCtx) const { return _store->getSpaceUsedBytes(opCtx); } long long ColumnStoreAccessMethod::getFreeStorageBytes(OperationContext* opCtx) const { return _store->getFreeStorageBytes(opCtx); } Status ColumnStoreAccessMethod::compact(OperationContext* opCtx) { return _store->compact(opCtx); } std::unique_ptr ColumnStoreAccessMethod::initiateBulk( size_t maxMemoryUsageBytes, const boost::optional& stateInfo, StringData dbName) { return (stateInfo && stateInfo->getFileName()) ? std::make_unique(this, maxMemoryUsageBytes, *stateInfo, dbName) : std::make_unique(this, maxMemoryUsageBytes, dbName); } std::shared_ptr ColumnStoreAccessMethod::getSharedIdent() const { return _store->getSharedIdent(); } void ColumnStoreAccessMethod::setIdent(std::shared_ptr ident) { _store->setIdent(std::move(ident)); } Status ColumnStoreAccessMethod::applyIndexBuildSideWrite(OperationContext* opCtx, const CollectionPtr& coll, const BSONObj& operation, const InsertDeleteOptions& unusedOptions, KeyHandlerFn&& unusedFn, int64_t* keysInserted, int64_t* keysDeleted) { const IndexBuildInterceptor::Op opType = operation.getStringField("op") == "i"_sd ? IndexBuildInterceptor::Op::kInsert : operation.getStringField("op") == "d"_sd ? IndexBuildInterceptor::Op::kDelete : IndexBuildInterceptor::Op::kUpdate; RecordId rid = RecordId::deserializeToken(operation.getField("rid")); CellView cell = operation.getStringField("cell"); PathView path = operation.getStringField("path"); auto cursor = _store->newWriteCursor(opCtx); tassert(6597803, "RecordID cannot be a string for column store indexes", !rid.isStr()); switch (opType) { case IndexBuildInterceptor::Op::kInsert: cursor->insert(path, rid.getLong(), cell); inc(keysInserted); opCtx->recoveryUnit()->onRollback( [keysInserted](OperationContext*) { dec(keysInserted); }); break; case IndexBuildInterceptor::Op::kDelete: cursor->remove(path, rid.getLong()); inc(keysDeleted); opCtx->recoveryUnit()->onRollback( [keysDeleted](OperationContext*) { dec(keysDeleted); }); break; case IndexBuildInterceptor::Op::kUpdate: cursor->update(path, rid.getLong(), cell); inc(keysInserted); opCtx->recoveryUnit()->onRollback( [keysInserted](OperationContext*) { dec(keysInserted); }); break; } return Status::OK(); } // static bool ColumnStoreAccessMethod::supportsBlockCompressor(StringData compressor) { static const std::set kSupportedCompressors = { "none"_sd, "snappy"_sd, "zlib"_sd, "zstd"_sd}; return kSupportedCompressors.count(compressor) > 0; } } // namespace mongo