summaryrefslogtreecommitdiff
path: root/src/mongo/db/index/columns_access_method.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/index/columns_access_method.cpp')
-rw-r--r--src/mongo/db/index/columns_access_method.cpp241
1 files changed, 190 insertions, 51 deletions
diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp
index ef03980b917..55e6c9ad03a 100644
--- a/src/mongo/db/index/columns_access_method.cpp
+++ b/src/mongo/db/index/columns_access_method.cpp
@@ -41,7 +41,9 @@
#include "mongo/db/index/column_cell.h"
#include "mongo/db/index/column_key_generator.h"
#include "mongo/db/index/column_store_sorter.h"
+#include "mongo/db/index/index_build_interceptor.h"
#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/storage/execution_context.h"
#include "mongo/logv2/log.h"
#include "mongo/util/progress_meter.h"
@@ -54,6 +56,11 @@ inline void inc(int64_t* counter) {
if (counter)
++*counter;
};
+
+inline void dec(int64_t* counter) {
+ if (counter)
+ --*counter;
+};
} // namespace
ColumnStoreAccessMethod::ColumnStoreAccessMethod(IndexCatalogEntry* ice,
@@ -260,6 +267,26 @@ Status ColumnStoreAccessMethod::BulkBuilder::keyCommitted(
return Status::OK();
}
+
+void ColumnStoreAccessMethod::_visitCellsForIndexInsert(
+ OperationContext* opCtx,
+ PooledFragmentBuilder& buf,
+ const std::vector<BsonRecord>& bsonRecords,
+ function_ref<void(StringData, const BsonRecord&)> cb) const {
+ _keyGen.visitCellsForInsert(
+ bsonRecords,
+ [&](StringData path, const BsonRecord& rec, const column_keygen::UnencodedCellView& cell) {
+ if (!rec.ts.isNull()) {
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts));
+ }
+ buf.reset();
+ column_keygen::writeEncodedCell(cell, &buf);
+ tassert(
+ 6597800, "RecordID cannot be a string for column store indexes", !rec.id.isStr());
+ cb(path, rec);
+ });
+}
+
Status ColumnStoreAccessMethod::insert(OperationContext* opCtx,
SharedBufferFragmentBuilder& pooledBufferBuilder,
const CollectionPtr& coll,
@@ -268,24 +295,33 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx,
int64_t* keysInsertedOut) {
try {
PooledFragmentBuilder buf(pooledBufferBuilder);
- auto cursor = _store->newWriteCursor(opCtx);
- _keyGen.visitCellsForInsert(
- bsonRecords,
- [&](StringData path,
- const BsonRecord& rec,
- const column_keygen::UnencodedCellView& cell) {
- if (!rec.ts.isNull()) {
- uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts));
+ // We cannot write to the index during its initial build phase, so we defer this insert as a
+ // "side write" to be applied after the build completes.
+ if (_indexCatalogEntry->isHybridBuilding()) {
+ auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys();
+ _visitCellsForIndexInsert(
+ opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) {
+ columnKeys->emplace_back(
+ path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rec.id);
+ });
+ int64_t inserted = 0;
+ ON_BLOCK_EXIT([keysInsertedOut, inserted] {
+ if (keysInsertedOut) {
+ *keysInsertedOut += inserted;
}
-
- buf.reset();
- column_keygen::writeEncodedCell(cell, &buf);
- invariant(!rec.id.isStr());
- cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())});
-
- inc(keysInsertedOut);
});
- return Status::OK();
+ uassertStatusOK(_indexCatalogEntry->indexBuildInterceptor()->sideWrite(
+ opCtx, *columnKeys, IndexBuildInterceptor::Op::kInsert, &inserted));
+ return Status::OK();
+ } else {
+ auto cursor = _store->newWriteCursor(opCtx);
+ _visitCellsForIndexInsert(
+ opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) {
+ cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())});
+ inc(keysInsertedOut);
+ });
+ return Status::OK();
+ }
} catch (const AssertionException& ex) {
return ex.toStatus();
}
@@ -300,12 +336,26 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx,
const InsertDeleteOptions& options,
int64_t* keysDeletedOut,
CheckRecordId checkRecordId) {
- auto cursor = _store->newWriteCursor(opCtx);
- _keyGen.visitPathsForDelete(obj, [&](StringData path) {
- tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr());
- cursor->remove(path, rid.getLong());
- inc(keysDeletedOut);
- });
+ if (_indexCatalogEntry->isHybridBuilding()) {
+ auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys();
+ _keyGen.visitPathsForDelete(obj, [&](StringData path) {
+ columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid));
+ });
+ int64_t removed = 0;
+ fassert(6597801,
+ _indexCatalogEntry->indexBuildInterceptor()->sideWrite(
+ opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed));
+ if (keysDeletedOut) {
+ *keysDeletedOut += removed;
+ }
+ } else {
+ auto cursor = _store->newWriteCursor(opCtx);
+ _keyGen.visitPathsForDelete(obj, [&](PathView path) {
+ tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr());
+ cursor->remove(path, rid.getLong());
+ inc(keysDeletedOut);
+ });
+ }
}
Status ColumnStoreAccessMethod::update(OperationContext* opCtx,
@@ -318,37 +368,88 @@ Status ColumnStoreAccessMethod::update(OperationContext* opCtx,
int64_t* keysInsertedOut,
int64_t* keysDeletedOut) {
PooledFragmentBuilder buf(pooledBufferBuilder);
- auto cursor = _store->newWriteCursor(opCtx);
- _keyGen.visitDiffForUpdate(
- oldDoc,
- newDoc,
- [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction,
- StringData path,
- const column_keygen::UnencodedCellView* cell) {
- if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) {
- tassert(
- 6762302, "RecordID cannot be a string for column store indexes", !rid.isStr());
- cursor->remove(path, rid.getLong());
- inc(keysDeletedOut);
- return;
- }
- // kInsert and kUpdate are handled almost identically. If we switch to using
- // `overwrite=true` cursors in WT, we could consider making them the same, although that
- // might disadvantage other implementations of the storage engine API.
- buf.reset();
- column_keygen::writeEncodedCell(*cell, &buf);
+ if (_indexCatalogEntry->isHybridBuilding()) {
+ auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys();
+ _keyGen.visitDiffForUpdate(
+ oldDoc,
+ newDoc,
+ [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction,
+ StringData path,
+ const column_keygen::UnencodedCellView* cell) {
+ if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) {
+ columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid));
+ int64_t removed = 0;
+ fassert(6597802,
+ _indexCatalogEntry->indexBuildInterceptor()->sideWrite(
+ opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed));
+
+ if (keysDeletedOut) {
+ *keysDeletedOut += removed;
+ }
+ return;
+ }
+
+ // kInsert and kUpdate are handled almost identically. If we switch to using
+ // `overwrite=true` cursors in WT, we could consider making them the same,
+ // although that might disadvantage other implementations of the storage engine
+ // API.
+ buf.reset();
+ column_keygen::writeEncodedCell(*cell, &buf);
- const auto method = diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert
- ? &ColumnStore::WriteCursor::insert
- : &ColumnStore::WriteCursor::update;
- tassert(6762303, "RecordID cannot be a string for column store indexes", !rid.isStr());
- (cursor.get()->*method)(path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())});
+ const auto method =
+ diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert
+ ? IndexBuildInterceptor::Op::kInsert
+ : IndexBuildInterceptor::Op::kUpdate;
- inc(keysInsertedOut);
- });
+ columnKeys->emplace_back(std::make_tuple(
+ path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rid));
+
+ int64_t inserted = 0;
+ Status status = _indexCatalogEntry->indexBuildInterceptor()->sideWrite(
+ opCtx, *columnKeys, method, &inserted);
+ if (keysInsertedOut) {
+ *keysInsertedOut += inserted;
+ }
+ });
+
+ } else {
+ auto cursor = _store->newWriteCursor(opCtx);
+ _keyGen.visitDiffForUpdate(
+ oldDoc,
+ newDoc,
+ [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction,
+ StringData path,
+ const column_keygen::UnencodedCellView* cell) {
+ if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) {
+ tassert(6762302,
+ "RecordID cannot be a string for column store indexes",
+ !rid.isStr());
+ cursor->remove(path, rid.getLong());
+ inc(keysDeletedOut);
+ return;
+ }
+
+ // kInsert and kUpdate are handled almost identically. If we switch to using
+ // `overwrite=true` cursors in WT, we could consider making them the same, although
+ // that might disadvantage other implementations of the storage engine API.
+ buf.reset();
+ column_keygen::writeEncodedCell(*cell, &buf);
+
+ const auto method =
+ diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert
+ ? &ColumnStore::WriteCursor::insert
+ : &ColumnStore::WriteCursor::update;
+ tassert(
+ 6762303, "RecordID cannot be a string for column store indexes", !rid.isStr());
+ (cursor.get()->*method)(
+ path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())});
+
+ inc(keysInsertedOut);
+ });
+ }
return Status::OK();
-}
+} // namespace mongo
Status ColumnStoreAccessMethod::initializeAsEmpty(OperationContext* opCtx) {
return Status::OK();
@@ -383,8 +484,9 @@ std::unique_ptr<IndexAccessMethod::BulkBuilder> ColumnStoreAccessMethod::initiat
size_t maxMemoryUsageBytes,
const boost::optional<IndexStateInfo>& stateInfo,
StringData dbName) {
- return stateInfo ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName)
- : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName);
+ return (stateInfo && stateInfo->getFileName())
+ ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName)
+ : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName);
}
std::shared_ptr<Ident> ColumnStoreAccessMethod::getSharedIdent() const {
@@ -395,4 +497,41 @@ void ColumnStoreAccessMethod::setIdent(std::shared_ptr<Ident> ident) {
_store->setIdent(std::move(ident));
}
+void ColumnStoreAccessMethod::applyColumnDataSideWrite(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const BSONObj& operation,
+ int64_t* keysInserted,
+ int64_t* keysDeleted) {
+ const IndexBuildInterceptor::Op opType = operation.getStringField("op") == "i"_sd
+ ? IndexBuildInterceptor::Op::kInsert
+ : operation.getStringField("op") == "d"_sd ? IndexBuildInterceptor::Op::kDelete
+ : IndexBuildInterceptor::Op::kUpdate;
+
+ RecordId rid = RecordId::deserializeToken(operation.getField("rid"));
+
+ CellView cell = operation.getStringField("cell");
+ PathView path = operation.getStringField("path");
+
+ auto cursor = _store->newWriteCursor(opCtx);
+
+ tassert(6597803, "RecordID cannot be a string for column store indexes", !rid.isStr());
+ switch (opType) {
+ case IndexBuildInterceptor::Op::kInsert:
+ cursor->insert(path, rid.getLong(), cell);
+ inc(keysInserted);
+ opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); });
+ break;
+ case IndexBuildInterceptor::Op::kDelete:
+ cursor->remove(path, rid.getLong());
+ inc(keysDeleted);
+ opCtx->recoveryUnit()->onRollback([keysDeleted] { dec(keysDeleted); });
+ break;
+ case IndexBuildInterceptor::Op::kUpdate:
+ cursor->update(path, rid.getLong(), cell);
+ inc(keysInserted);
+ opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); });
+ break;
+ }
+}
+
} // namespace mongo