diff options
author | Will Buerger <will.buerger@mongodb.com> | 2022-09-30 20:59:42 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-30 22:02:57 +0000 |
commit | 88203efd43517720245e920a5d23bf4f0aeb35b0 (patch) | |
tree | 676fa01fb5ad55ec1b654dacf1e3d0548e7b1c01 /src/mongo | |
parent | 29c5c67ac1e47a716717c9a0e5e398865c789873 (diff) | |
download | mongo-88203efd43517720245e920a5d23bf4f0aeb35b0.tar.gz |
SERVER-65978: Online column store index builds
Co-authored-by: Erin Zhu <erin.zhu@mongodb.com>
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/index/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/index/columns_access_method.cpp | 241 | ||||
-rw-r--r-- | src/mongo/db/index/columns_access_method.h | 12 | ||||
-rw-r--r-- | src/mongo/db/index/index_access_method.cpp | 61 | ||||
-rw-r--r-- | src/mongo/db/index/index_access_method.h | 26 | ||||
-rw-r--r-- | src/mongo/db/index/index_build_interceptor.cpp | 142 | ||||
-rw-r--r-- | src/mongo/db/index/index_build_interceptor.h | 20 | ||||
-rw-r--r-- | src/mongo/db/index/index_build_interceptor_test.cpp | 234 | ||||
-rw-r--r-- | src/mongo/db/storage/column_store.h | 2 | ||||
-rw-r--r-- | src/mongo/db/storage/execution_context.h | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp | 2 |
11 files changed, 622 insertions, 124 deletions
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript index 32a10b4eee7..d4fc11138c6 100644 --- a/src/mongo/db/index/SConscript +++ b/src/mongo/db/index/SConscript @@ -154,6 +154,7 @@ env.Library( '$BUILD_DIR/mongo/db/fts/base_fts', '$BUILD_DIR/mongo/db/resumable_index_builds_idl', '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/storage/execution_context', 'column_store_index', 'expression_params', 'key_generator', diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp index ef03980b917..55e6c9ad03a 100644 --- a/src/mongo/db/index/columns_access_method.cpp +++ b/src/mongo/db/index/columns_access_method.cpp @@ -41,7 +41,9 @@ #include "mongo/db/index/column_cell.h" #include "mongo/db/index/column_key_generator.h" #include "mongo/db/index/column_store_sorter.h" +#include "mongo/db/index/index_build_interceptor.h" #include "mongo/db/index/index_descriptor.h" +#include "mongo/db/storage/execution_context.h" #include "mongo/logv2/log.h" #include "mongo/util/progress_meter.h" @@ -54,6 +56,11 @@ inline void inc(int64_t* counter) { if (counter) ++*counter; }; + +inline void dec(int64_t* counter) { + if (counter) + --*counter; +}; } // namespace ColumnStoreAccessMethod::ColumnStoreAccessMethod(IndexCatalogEntry* ice, @@ -260,6 +267,26 @@ Status ColumnStoreAccessMethod::BulkBuilder::keyCommitted( return Status::OK(); } + +void ColumnStoreAccessMethod::_visitCellsForIndexInsert( + OperationContext* opCtx, + PooledFragmentBuilder& buf, + const std::vector<BsonRecord>& bsonRecords, + function_ref<void(StringData, const BsonRecord&)> cb) const { + _keyGen.visitCellsForInsert( + bsonRecords, + [&](StringData path, const BsonRecord& rec, const column_keygen::UnencodedCellView& cell) { + if (!rec.ts.isNull()) { + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts)); + } + buf.reset(); + column_keygen::writeEncodedCell(cell, &buf); + tassert( + 6597800, "RecordID cannot be a string for column store indexes", !rec.id.isStr()); + cb(path, rec); + }); +} + Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, SharedBufferFragmentBuilder& pooledBufferBuilder, const CollectionPtr& coll, @@ -268,24 +295,33 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, int64_t* keysInsertedOut) { try { PooledFragmentBuilder buf(pooledBufferBuilder); - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitCellsForInsert( - bsonRecords, - [&](StringData path, - const BsonRecord& rec, - const column_keygen::UnencodedCellView& cell) { - if (!rec.ts.isNull()) { - uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts)); + // We cannot write to the index during its initial build phase, so we defer this insert as a + // "side write" to be applied after the build completes. + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _visitCellsForIndexInsert( + opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) { + columnKeys->emplace_back( + path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rec.id); + }); + int64_t inserted = 0; + ON_BLOCK_EXIT([keysInsertedOut, inserted] { + if (keysInsertedOut) { + *keysInsertedOut += inserted; } - - buf.reset(); - column_keygen::writeEncodedCell(cell, &buf); - invariant(!rec.id.isStr()); - cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())}); - - inc(keysInsertedOut); }); - return Status::OK(); + uassertStatusOK(_indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kInsert, &inserted)); + return Status::OK(); + } else { + auto cursor = _store->newWriteCursor(opCtx); + _visitCellsForIndexInsert( + opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) { + cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())}); + inc(keysInsertedOut); + }); + return Status::OK(); + } } catch (const AssertionException& ex) { return ex.toStatus(); } @@ -300,12 +336,26 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx, const InsertDeleteOptions& options, int64_t* keysDeletedOut, CheckRecordId checkRecordId) { - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitPathsForDelete(obj, [&](StringData path) { - tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr()); - cursor->remove(path, rid.getLong()); - inc(keysDeletedOut); - }); + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _keyGen.visitPathsForDelete(obj, [&](StringData path) { + columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid)); + }); + int64_t removed = 0; + fassert(6597801, + _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed)); + if (keysDeletedOut) { + *keysDeletedOut += removed; + } + } else { + auto cursor = _store->newWriteCursor(opCtx); + _keyGen.visitPathsForDelete(obj, [&](PathView path) { + tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr()); + cursor->remove(path, rid.getLong()); + inc(keysDeletedOut); + }); + } } Status ColumnStoreAccessMethod::update(OperationContext* opCtx, @@ -318,37 +368,88 @@ Status ColumnStoreAccessMethod::update(OperationContext* opCtx, int64_t* keysInsertedOut, int64_t* keysDeletedOut) { PooledFragmentBuilder buf(pooledBufferBuilder); - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitDiffForUpdate( - oldDoc, - newDoc, - [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, - StringData path, - const column_keygen::UnencodedCellView* cell) { - if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { - tassert( - 6762302, "RecordID cannot be a string for column store indexes", !rid.isStr()); - cursor->remove(path, rid.getLong()); - inc(keysDeletedOut); - return; - } - // kInsert and kUpdate are handled almost identically. If we switch to using - // `overwrite=true` cursors in WT, we could consider making them the same, although that - // might disadvantage other implementations of the storage engine API. - buf.reset(); - column_keygen::writeEncodedCell(*cell, &buf); + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _keyGen.visitDiffForUpdate( + oldDoc, + newDoc, + [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, + StringData path, + const column_keygen::UnencodedCellView* cell) { + if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { + columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid)); + int64_t removed = 0; + fassert(6597802, + _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed)); + + if (keysDeletedOut) { + *keysDeletedOut += removed; + } + return; + } + + // kInsert and kUpdate are handled almost identically. If we switch to using + // `overwrite=true` cursors in WT, we could consider making them the same, + // although that might disadvantage other implementations of the storage engine + // API. + buf.reset(); + column_keygen::writeEncodedCell(*cell, &buf); - const auto method = diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert - ? &ColumnStore::WriteCursor::insert - : &ColumnStore::WriteCursor::update; - tassert(6762303, "RecordID cannot be a string for column store indexes", !rid.isStr()); - (cursor.get()->*method)(path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())}); + const auto method = + diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert + ? IndexBuildInterceptor::Op::kInsert + : IndexBuildInterceptor::Op::kUpdate; - inc(keysInsertedOut); - }); + columnKeys->emplace_back(std::make_tuple( + path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rid)); + + int64_t inserted = 0; + Status status = _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, method, &inserted); + if (keysInsertedOut) { + *keysInsertedOut += inserted; + } + }); + + } else { + auto cursor = _store->newWriteCursor(opCtx); + _keyGen.visitDiffForUpdate( + oldDoc, + newDoc, + [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, + StringData path, + const column_keygen::UnencodedCellView* cell) { + if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { + tassert(6762302, + "RecordID cannot be a string for column store indexes", + !rid.isStr()); + cursor->remove(path, rid.getLong()); + inc(keysDeletedOut); + return; + } + + // kInsert and kUpdate are handled almost identically. If we switch to using + // `overwrite=true` cursors in WT, we could consider making them the same, although + // that might disadvantage other implementations of the storage engine API. + buf.reset(); + column_keygen::writeEncodedCell(*cell, &buf); + + const auto method = + diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert + ? &ColumnStore::WriteCursor::insert + : &ColumnStore::WriteCursor::update; + tassert( + 6762303, "RecordID cannot be a string for column store indexes", !rid.isStr()); + (cursor.get()->*method)( + path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())}); + + inc(keysInsertedOut); + }); + } return Status::OK(); -} +} // namespace mongo Status ColumnStoreAccessMethod::initializeAsEmpty(OperationContext* opCtx) { return Status::OK(); @@ -383,8 +484,9 @@ std::unique_ptr<IndexAccessMethod::BulkBuilder> ColumnStoreAccessMethod::initiat size_t maxMemoryUsageBytes, const boost::optional<IndexStateInfo>& stateInfo, StringData dbName) { - return stateInfo ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName) - : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName); + return (stateInfo && stateInfo->getFileName()) + ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName) + : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName); } std::shared_ptr<Ident> ColumnStoreAccessMethod::getSharedIdent() const { @@ -395,4 +497,41 @@ void ColumnStoreAccessMethod::setIdent(std::shared_ptr<Ident> ident) { _store->setIdent(std::move(ident)); } +void ColumnStoreAccessMethod::applyColumnDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + int64_t* keysInserted, + int64_t* keysDeleted) { + const IndexBuildInterceptor::Op opType = operation.getStringField("op") == "i"_sd + ? IndexBuildInterceptor::Op::kInsert + : operation.getStringField("op") == "d"_sd ? IndexBuildInterceptor::Op::kDelete + : IndexBuildInterceptor::Op::kUpdate; + + RecordId rid = RecordId::deserializeToken(operation.getField("rid")); + + CellView cell = operation.getStringField("cell"); + PathView path = operation.getStringField("path"); + + auto cursor = _store->newWriteCursor(opCtx); + + tassert(6597803, "RecordID cannot be a string for column store indexes", !rid.isStr()); + switch (opType) { + case IndexBuildInterceptor::Op::kInsert: + cursor->insert(path, rid.getLong(), cell); + inc(keysInserted); + opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); }); + break; + case IndexBuildInterceptor::Op::kDelete: + cursor->remove(path, rid.getLong()); + inc(keysDeleted); + opCtx->recoveryUnit()->onRollback([keysDeleted] { dec(keysDeleted); }); + break; + case IndexBuildInterceptor::Op::kUpdate: + cursor->update(path, rid.getLong(), cell); + inc(keysInserted); + opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); }); + break; + } +} + } // namespace mongo diff --git a/src/mongo/db/index/columns_access_method.h b/src/mongo/db/index/columns_access_method.h index fefa5240468..44848fa114c 100644 --- a/src/mongo/db/index/columns_access_method.h +++ b/src/mongo/db/index/columns_access_method.h @@ -75,6 +75,7 @@ public: const InsertDeleteOptions& options, int64_t* keysDeletedOut, CheckRecordId checkRecordId) final; + Status update(OperationContext* opCtx, SharedBufferFragmentBuilder& pooledBufferBuilder, const BSONObj& oldDoc, @@ -85,6 +86,12 @@ public: int64_t* keysInsertedOut, int64_t* keysDeletedOut) final; + void applyColumnDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + int64_t* keysInserted, + int64_t* keysDeleted) final; + Status initializeAsEmpty(OperationContext* opCtx) final; void validate(OperationContext* opCtx, @@ -117,6 +124,11 @@ public: class BulkBuilder; private: + void _visitCellsForIndexInsert(OperationContext* opCtx, + PooledFragmentBuilder& pooledFragmentBuilder, + const std::vector<BsonRecord>& bsonRecords, + function_ref<void(StringData, const BsonRecord&)> cb) const; + const std::unique_ptr<ColumnStore> _store; IndexCatalogEntry* const _indexCatalogEntry; // owned by IndexCatalog const IndexDescriptor* const _descriptor; diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index db5a1974aa1..7e7bc6a7c89 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -620,6 +620,66 @@ void SortedDataIndexAccessMethod::setIdent(std::shared_ptr<Ident> newIdent) { this->_newInterface->setIdent(std::move(newIdent)); } +Status SortedDataIndexAccessMethod::applySortedDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + const InsertDeleteOptions& options, + KeyHandlerFn&& onDuplicateKey, + int64_t* const keysInserted, + int64_t* const keysDeleted) { + auto opType = [&operation] { + switch (operation.getStringField("op")[0]) { + case 'i': + return IndexBuildInterceptor::Op::kInsert; + case 'd': + return IndexBuildInterceptor::Op::kDelete; + case 'u': + return IndexBuildInterceptor::Op::kUpdate; + default: + MONGO_UNREACHABLE; + } + }(); + + // Deserialize the encoded KeyString::Value. + int keyLen; + const char* binKey = operation["key"].binData(keyLen); + BufReader reader(binKey, keyLen); + const KeyString::Value keyString = + KeyString::Value::deserialize(reader, getSortedDataInterface()->getKeyStringVersion()); + + const KeyStringSet keySet{keyString}; + if (opType == IndexBuildInterceptor::Op::kInsert) { + int64_t numInserted; + auto status = insertKeysAndUpdateMultikeyPaths(opCtx, + coll, + {keySet.begin(), keySet.end()}, + {}, + MultikeyPaths{}, + options, + std::move(onDuplicateKey), + &numInserted); + if (!status.isOK()) { + return status; + } + + *keysInserted += numInserted; + opCtx->recoveryUnit()->onRollback( + [keysInserted, numInserted] { *keysInserted -= numInserted; }); + } else { + invariant(opType == IndexBuildInterceptor::Op::kDelete); + int64_t numDeleted; + Status s = removeKeys(opCtx, {keySet.begin(), keySet.end()}, options, &numDeleted); + if (!s.isOK()) { + return s; + } + + *keysDeleted += numDeleted; + opCtx->recoveryUnit()->onRollback( + [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; }); + } + return Status::OK(); +} + void IndexAccessMethod::BulkBuilder::countNewBuildInStats() { indexBulkBuilderSSS.count.addAndFetch(1); } @@ -1206,7 +1266,6 @@ void SortedDataIndexAccessMethod::_unindexKeysOrWriteToSideTable( if (!status.isOK()) { LOGV2(20362, - "Couldn't unindex record {obj} from collection {namespace}: {error}", "Couldn't unindex record", "record"_attr = redact(obj), "namespace"_attr = ns, diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index 8c565152b67..40e012aa238 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -174,6 +174,24 @@ public: */ virtual void setIdent(std::shared_ptr<Ident> newIdent) = 0; + virtual Status applySortedDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + const InsertDeleteOptions& options, + KeyHandlerFn&& onDuplicateKey, + int64_t* const keysInserted, + int64_t* const keysDeleted) { + MONGO_UNREACHABLE; + }; + + virtual void applyColumnDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + int64_t* keysInserted, + int64_t* keysDeleted) { + MONGO_UNREACHABLE; + }; + // // Bulk operations support // @@ -561,6 +579,14 @@ public: void setIdent(std::shared_ptr<Ident> newIdent) final; + Status applySortedDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + const InsertDeleteOptions& options, + KeyHandlerFn&& onDuplicateKey, + int64_t* keysInserted, + int64_t* keysDeleted) final; + std::unique_ptr<BulkBuilder> initiateBulk(size_t maxMemoryUsageBytes, const boost::optional<IndexStateInfo>& stateInfo, StringData dbName) final; diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp index 36072c25bf5..872c5301d85 100644 --- a/src/mongo/db/index/index_build_interceptor.cpp +++ b/src/mongo/db/index/index_build_interceptor.cpp @@ -39,6 +39,7 @@ #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" +#include "mongo/db/index/columns_access_method.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_build_interceptor_gen.h" #include "mongo/db/multi_key_path_tracker.h" @@ -291,56 +292,25 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, TrackDuplicates trackDups, int64_t* const keysInserted, int64_t* const keysDeleted) { - // Deserialize the encoded KeyString::Value. - int keyLen; - const char* binKey = operation["key"].binData(keyLen); - BufReader reader(binKey, keyLen); - auto accessMethod = _indexCatalogEntry->accessMethod()->asSortedData(); - const KeyString::Value keyString = KeyString::Value::deserialize( - reader, accessMethod->getSortedDataInterface()->getKeyStringVersion()); - - const Op opType = operation.getStringField("op") == "i"_sd ? Op::kInsert : Op::kDelete; - - const KeyStringSet keySet{keyString}; - if (opType == Op::kInsert) { - int64_t numInserted; - auto status = accessMethod->insertKeysAndUpdateMultikeyPaths( + // Check field for "key" to determine if collection is sorted data or column store. + if (operation.hasField("key")) { + return _indexCatalogEntry->accessMethod()->applySortedDataSideWrite( opCtx, coll, - {keySet.begin(), keySet.end()}, - {}, - MultikeyPaths{}, + operation, options, [=](const KeyString::Value& duplicateKey) { return trackDups == TrackDuplicates::kTrack ? recordDuplicateKey(opCtx, duplicateKey) : Status::OK(); }, - &numInserted); - if (!status.isOK()) { - return status; - } - - *keysInserted += numInserted; - opCtx->recoveryUnit()->onRollback( - [keysInserted, numInserted] { *keysInserted -= numInserted; }); + keysInserted, + keysDeleted); } else { - invariant(opType == Op::kDelete); - if (kDebugBuild) - invariant(operation.getStringField("op") == "d"_sd); - - int64_t numDeleted; - Status s = - accessMethod->removeKeys(opCtx, {keySet.begin(), keySet.end()}, options, &numDeleted); - if (!s.isOK()) { - return s; - } - - *keysDeleted += numDeleted; - opCtx->recoveryUnit()->onRollback( - [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; }); + _indexCatalogEntry->accessMethod()->applyColumnDataSideWrite( + opCtx, coll, operation, keysInserted, keysDeleted); + return Status::OK(); } - return Status::OK(); } void IndexBuildInterceptor::_yield(OperationContext* opCtx, const Yieldable* yieldable) { @@ -422,6 +392,35 @@ boost::optional<MultikeyPaths> IndexBuildInterceptor::getMultikeyPaths() const { return _multikeyPaths; } +Status IndexBuildInterceptor::_finishSideWrite(OperationContext* opCtx, + const std::vector<BSONObj>& toInsert) { + _sideWritesCounter->fetchAndAdd(toInsert.size()); + // This insert may roll back, but not necessarily from inserting into this table. If other write + // operations outside this table and in the same transaction are rolled back, this counter also + // needs to be rolled back. + opCtx->recoveryUnit()->onRollback([sharedCounter = _sideWritesCounter, size = toInsert.size()] { + sharedCounter->fetchAndSubtract(size); + }); + + std::vector<Record> records; + for (auto& doc : toInsert) { + records.emplace_back(Record{RecordId(), // The storage engine will assign its own RecordId + // when we pass one that is null. + RecordData(doc.objdata(), doc.objsize())}); + } + + LOGV2_DEBUG(20691, + 2, + "Recording side write keys on index", + "numRecords"_attr = records.size(), + "index"_attr = _indexCatalogEntry->descriptor()->indexName()); + + // By passing a vector of null timestamps, these inserts are not timestamped individually, but + // rather with the timestamp of the owning operation. + std::vector<Timestamp> timestamps(records.size()); + return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps); +} + Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, const KeyStringSet& keys, const KeyStringSet& multikeyMetadataKeys, @@ -429,6 +428,7 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, Op op, int64_t* const numKeysOut) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); + invariant(op != IndexBuildInterceptor::Op::kUpdate); // Maintain parity with IndexAccessMethods handling of key counting. Only include // `multikeyMetadataKeys` when inserting. @@ -478,9 +478,9 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, } if (op == Op::kInsert) { - // Wildcard indexes write multikey path information, typically part of the catalog - // document, to the index itself. Multikey information is never deleted, so we only need - // to add this data on the insert path. + // Wildcard indexes write multikey path information, typically part of the catalog document, + // to the index itself. Multikey information is never deleted, so we only need to add this + // data on the insert path. for (const auto& keyString : multikeyMetadataKeys) { builder.reset(); keyString.serialize(builder); @@ -491,33 +491,41 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, } } - _sideWritesCounter->fetchAndAdd(toInsert.size()); - // This insert may roll back, but not necessarily from inserting into this table. If other write - // operations outside this table and in the same transaction are rolled back, this counter also - // needs to be rolled back. - opCtx->recoveryUnit()->onRollback([sharedCounter = _sideWritesCounter, size = toInsert.size()] { - sharedCounter->fetchAndSubtract(size); - }); + return _finishSideWrite(opCtx, std::move(toInsert)); +} - std::vector<Record> records; - for (auto& doc : toInsert) { - records.emplace_back(Record{RecordId(), // The storage engine will assign its own RecordId - // when we pass one that is null. - RecordData(doc.objdata(), doc.objsize())}); - } +Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, + const PathCellSet& keys, + Op op, + int64_t* const numKeysOut) { + invariant(opCtx->lockState()->inAWriteUnitOfWork()); - LOGV2_DEBUG(20691, - 2, - "recording {records_size} side write keys on index " - "'{indexCatalogEntry_descriptor_indexName}'", - "records_size"_attr = records.size(), - "indexCatalogEntry_descriptor_indexName"_attr = - _indexCatalogEntry->descriptor()->indexName()); + *numKeysOut = keys.size(); - // By passing a vector of null timestamps, these inserts are not timestamped individually, but - // rather with the timestamp of the owning operation. - std::vector<Timestamp> timestamps(records.size()); - return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps); + std::vector<BSONObj> toInsert; + toInsert.reserve(keys.size()); + for (const auto& [path, cell, rid] : keys) { + + BSONObjBuilder builder; + rid.serializeToken("rid", &builder); + builder.append("op", [op] { + switch (op) { + case Op::kInsert: + return "i"; + case Op::kDelete: + return "d"; + case Op::kUpdate: + return "u"; + } + MONGO_UNREACHABLE; + }()); + builder.append("path", path); + builder.append("cell", cell); + + toInsert.push_back(builder.obj()); + } + + return _finishSideWrite(opCtx, std::move(toInsert)); } Status IndexBuildInterceptor::retrySkippedRecords(OperationContext* opCtx, diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h index 46c4f5e6e8b..b1888d46f76 100644 --- a/src/mongo/db/index/index_build_interceptor.h +++ b/src/mongo/db/index/index_build_interceptor.h @@ -31,11 +31,13 @@ #include <memory> +#include "mongo/db/index/columns_access_method.h" #include "mongo/db/index/duplicate_key_tracker.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/multikey_paths.h" #include "mongo/db/index/skipped_record_tracker.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/storage/column_store.h" #include "mongo/db/storage/temporary_record_store.h" #include "mongo/db/yieldable.h" #include "mongo/platform/atomic_word.h" @@ -53,7 +55,7 @@ public: */ enum class DrainYieldPolicy { kNoYield, kYield }; - enum class Op { kInsert, kDelete }; + enum class Op { kInsert, kDelete, kUpdate }; /** * Indicates whether to record duplicate keys that have been inserted into the index. When set @@ -101,6 +103,18 @@ public: int64_t* numKeysOut); /** + * Client writes that are concurrent with a column store index build will have their index + * updates written to a temporary table. After the index table scan is complete, these updates + * will be applied to the underlying index table. + * + * On success, `numKeysOut` if non-null will contain the number of keys added or removed. + */ + Status sideWrite(OperationContext* opCtx, + const PathCellSet& columnstoreKeys, + Op op, + int64_t* numKeysOut); + + /** * Given a duplicate key, record the key for later verification by a call to * checkDuplicateKeyConstraints(); */ @@ -173,7 +187,6 @@ public: private: using SideWriteRecord = std::pair<RecordId, BSONObj>; - Status _applyWrite(OperationContext* opCtx, const CollectionPtr& coll, const BSONObj& doc, @@ -193,6 +206,8 @@ private: FailPoint* fp, long long iteration) const; + Status _finishSideWrite(OperationContext* opCtx, const std::vector<BSONObj>& toInsert); + // The entry for the index that is being built. const IndexCatalogEntry* _indexCatalogEntry; @@ -224,5 +239,4 @@ private: MONGO_MAKE_LATCH("IndexBuildInterceptor::_multikeyPathMutex"); boost::optional<MultikeyPaths> _multikeyPaths; }; - } // namespace mongo diff --git a/src/mongo/db/index/index_build_interceptor_test.cpp b/src/mongo/db/index/index_build_interceptor_test.cpp index 30f996d69d2..e0f9f86c98a 100644 --- a/src/mongo/db/index/index_build_interceptor_test.cpp +++ b/src/mongo/db/index/index_build_interceptor_test.cpp @@ -30,6 +30,7 @@ #include "mongo/db/catalog/catalog_test_fixture.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/index/index_build_interceptor.h" +#include "mongo/idl/server_parameter_test_util.h" namespace mongo { namespace { @@ -118,5 +119,238 @@ TEST_F(IndexBuilderInterceptorTest, SingleInsertIsSavedToSideWritesTable) { << "key" << serializedKeyString), sideWrites[0]); } + + +TEST_F(IndexBuilderInterceptorTest, SingleColumnInsertIsSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1))); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys)); + ASSERT_EQ(1, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem = obj["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(1, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op" + << "i" + << "path" + << "path" + << "cell" + << "cell"), + sideWrites[0]); +} + +TEST_F(IndexBuilderInterceptorTest, SingleColumnDeleteIsSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "", RecordId(1))); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kDelete, &numKeys)); + ASSERT_EQ(1, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem = obj["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(1, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op" + << "d" + << "path" + << "path" + << "cell" + << ""), + sideWrites[0]); +} + +TEST_F(IndexBuilderInterceptorTest, SingleColumnUpdateIsSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + // create path + cell + rid + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1))); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kUpdate, &numKeys)); + ASSERT_EQ(1, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem = obj["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(1, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op" + << "u" + << "path" + << "path" + << "cell" + << "cell"), + sideWrites[0]); +} + +TEST_F(IndexBuilderInterceptorTest, MultipleColumnInsertsAreSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1))); + columnKeys.emplace_back(std::make_tuple("path1", "cell1", RecordId(1))); + columnKeys.emplace_back(std::make_tuple("path2", "cell2", RecordId(2))); + columnKeys.emplace_back(std::make_tuple("path3", "cell3", RecordId(2))); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys)); + ASSERT_EQ(4, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem1 = obj["rid"]; + + BSONObjBuilder builder2; + RecordId(2).serializeToken("rid", &builder2); + BSONObj obj2 = builder2.obj(); + BSONElement elem2 = obj2["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(4, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op" + << "i" + << "path" + << "path" + << "cell" + << "cell"), + sideWrites[0]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op" + << "i" + << "path" + << "path1" + << "cell" + << "cell1"), + sideWrites[1]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op" + << "i" + << "path" + << "path2" + << "cell" + << "cell2"), + sideWrites[2]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op" + << "i" + << "path" + << "path3" + << "cell" + << "cell3"), + sideWrites[3]); +} + +TEST_F(IndexBuilderInterceptorTest, MultipleColumnSideWritesAreSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1))); + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys)); + ASSERT_EQ(1, numKeys); + + PathCellSet columnKeys2; + columnKeys2.emplace_back(std::make_tuple("path", "", RecordId(1))); + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys2, IndexBuildInterceptor::Op::kDelete, &numKeys)); + ASSERT_EQ(1, numKeys); + + + PathCellSet columnKeys3; + columnKeys3.emplace_back(std::make_tuple("path1", "cell1", RecordId(2))); + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys3, IndexBuildInterceptor::Op::kUpdate, &numKeys)); + ASSERT_EQ(1, numKeys); + + PathCellSet columnKeys4; + columnKeys4.emplace_back(std::make_tuple("path2", "cell2", RecordId(2))); + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys4, IndexBuildInterceptor::Op::kInsert, &numKeys)); + ASSERT_EQ(1, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem1 = obj["rid"]; + + BSONObjBuilder builder2; + RecordId(2).serializeToken("rid", &builder2); + BSONObj obj2 = builder2.obj(); + BSONElement elem2 = obj2["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(4, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op" + << "i" + << "path" + << "path" + << "cell" + << "cell"), + sideWrites[0]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op" + << "d" + << "path" + << "path" + << "cell" + << ""), + sideWrites[1]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op" + << "u" + << "path" + << "path1" + << "cell" + << "cell1"), + sideWrites[2]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op" + << "i" + << "path" + << "path2" + << "cell" + << "cell2"), + sideWrites[3]); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/storage/column_store.h b/src/mongo/db/storage/column_store.h index 9db8ca7d1bb..30b21cbe340 100644 --- a/src/mongo/db/storage/column_store.h +++ b/src/mongo/db/storage/column_store.h @@ -786,4 +786,6 @@ struct SplitCellView { } } }; + +using PathCellSet = std::vector<std::tuple<std::string, std::string, RecordId>>; } // namespace mongo diff --git a/src/mongo/db/storage/execution_context.h b/src/mongo/db/storage/execution_context.h index 28479f833b9..4ceed7ec4a3 100644 --- a/src/mongo/db/storage/execution_context.h +++ b/src/mongo/db/storage/execution_context.h @@ -31,6 +31,7 @@ #include "mongo/db/index/multikey_paths.h" #include "mongo/db/operation_context.h" +#include "mongo/db/storage/column_store.h" #include "mongo/db/storage/key_string.h" #include "mongo/util/auto_clear_ptr.h" @@ -61,11 +62,15 @@ public: AutoClearPtr<MultikeyPaths> multikeyPaths() { return makeAutoClearPtr(&_multikeyPaths); } + AutoClearPtr<PathCellSet> columnKeys() { + return makeAutoClearPtr(&_columnKeys); + } private: KeyStringSet _keys; KeyStringSet _multikeyMetadataKeys; MultikeyPaths _multikeyPaths; + PathCellSet _columnKeys; }; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp index 1caf0b67158..9f0561911b8 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp @@ -175,7 +175,6 @@ void WiredTigerColumnStore::WriteCursor::insert(PathView path, RowId rid, CellVi auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(c()->uri, keyItem.size); - // TODO: SERVER-65978, we may have to specially handle WT_DUPLICATE_KEY error here. if (ret) { uassertStatusOK(wtRCToStatus(ret, c()->session)); } @@ -219,7 +218,6 @@ void WiredTigerColumnStore::WriteCursor::update(PathView path, RowId rid, CellVi auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(c()->uri, keyItem.size); - // TODO: SERVER-65978, may want to handle WT_NOTFOUND specially. if (ret != 0) return uassertStatusOK(wtRCToStatus(ret, c()->session)); } |