diff options
author | Will Buerger <will.buerger@mongodb.com> | 2022-09-30 20:59:42 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-30 22:02:57 +0000 |
commit | 88203efd43517720245e920a5d23bf4f0aeb35b0 (patch) | |
tree | 676fa01fb5ad55ec1b654dacf1e3d0548e7b1c01 | |
parent | 29c5c67ac1e47a716717c9a0e5e398865c789873 (diff) | |
download | mongo-88203efd43517720245e920a5d23bf4f0aeb35b0.tar.gz |
SERVER-65978: Online column store index builds
Co-authored-by: Erin Zhu <erin.zhu@mongodb.com>
18 files changed, 889 insertions, 213 deletions
diff --git a/jstests/noPassthrough/libs/index_build.js b/jstests/noPassthrough/libs/index_build.js index 4e4775cb64d..6577df94016 100644 --- a/jstests/noPassthrough/libs/index_build.js +++ b/jstests/noPassthrough/libs/index_build.js @@ -358,8 +358,8 @@ const ResumableIndexBuildTest = class { } /** - * Runs createIndexFn in a parellel shell to create indexes, inserting the documents specified - * by sideWrites into the side writes table. + * Runs createIndexFn in a parellel shell to create indexes, modifying the collection with the + * side writes table. * * 'createIndexFn' should take three parameters: collection name, index specifications, and * index names. @@ -368,6 +368,10 @@ const ResumableIndexBuildTest = class { * is [[{a: 1}, {b: 1}], [{c: 1}]], a valid indexNames would look like * [["index_1", "index_2"], ["index_3"]]. * + * 'sideWrites' can be an array specifying documents to be inserted into the side writes table, + * or a function that performs any series of operations (inserts, deletes, or updates) with the + * side writes table + * * If {hangBeforeBuildingIndex: true}, returns with the hangBeforeBuildingIndex failpoint * enabled and the index builds hanging at this point. */ @@ -411,7 +415,11 @@ const ResumableIndexBuildTest = class { }); } - assert.commandWorked(coll.insert(sideWrites)); + if (Array.isArray(sideWrites)) { + assert.commandWorked(coll.insert(sideWrites)); + } else { + sideWrites(coll); + } // Before building the index, wait for the the last op to be committed so that establishing // the majority read cursor does not race with step down. diff --git a/jstests/noPassthrough/restart_index_build_if_resume_fails.js b/jstests/noPassthrough/restart_index_build_if_resume_fails.js index e166465d2c6..0cee7035e63 100644 --- a/jstests/noPassthrough/restart_index_build_if_resume_fails.js +++ b/jstests/noPassthrough/restart_index_build_if_resume_fails.js @@ -55,33 +55,55 @@ ResumableIndexBuildTest.runFailToResume(rst, [{a: 10}, {a: 11}], [{a: 12}, {a: 13}]); -// TODO (SERVER-65978): Add side writes to these test cases once they are supported by column store -// index builds. if (columnstoreEnabled) { - ResumableIndexBuildTest.runFailToResume(rst, - dbName, - collName, - {"$**": "columnstore"}, - {failPointAfterStartup: "failToParseResumeIndexInfo"}, - [], - [{a: 4}, {a: 5}], - true /* failWhileParsing */); + ResumableIndexBuildTest.runFailToResume( + rst, + dbName, + collName, + {"$**": "columnstore"}, + {failPointAfterStartup: "failToParseResumeIndexInfo"}, + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 14}]}, {a: 15}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 14})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 15}, {a: 1})); + }), + [{a: 16}, {a: 17}], + true /* failWhileParsing */); - ResumableIndexBuildTest.runFailToResume(rst, - dbName, - collName, - {"$**": "columnstore"}, - {failPointAfterStartup: "failSetUpResumeIndexBuild"}, - [], - [{a: 8}, {a: 9}]); + ResumableIndexBuildTest.runFailToResume( + rst, + dbName, + collName, + {"$**": "columnstore"}, + {failPointAfterStartup: "failSetUpResumeIndexBuild"}, + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 18}]}, {a: 19}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 18})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 19}, {a: 1})); + }), + [{a: 20}, {a: 21}]); - ResumableIndexBuildTest.runFailToResume(rst, - dbName, - collName, - {"$**": "columnstore"}, - {removeTempFilesBeforeStartup: true}, - [], - [{a: 12}, {a: 13}]); + ResumableIndexBuildTest.runFailToResume( + rst, + dbName, + collName, + {"$**": "columnstore"}, + {removeTempFilesBeforeStartup: true}, + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 22}]}, {a: 23}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 22})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 23}, {a: 1})); + }), + [{a: 24}, {a: 25}]); } rst.stopSet(); diff --git a/jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js b/jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js index a33f279e9a4..3a4a84880f6 100644 --- a/jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js +++ b/jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js @@ -51,8 +51,6 @@ ResumableIndexBuildTest.runResumeInterruptedByShutdown( [{a: 77}, {a: 88}], [{a: 99}, {a: 100}]); -// TODO (SERVER-65978): Add side writes to these test cases once they are supported by column store -// index builds. if (columnstoreEnabled) { ResumableIndexBuildTest.runResumeInterruptedByShutdown( rst, @@ -63,8 +61,15 @@ if (columnstoreEnabled) { {name: "hangIndexBuildDuringCollectionScanPhaseBeforeInsertion", logIdWithBuildUUID: 20386}, "collection scan", {a: 1}, // initial doc - [], - [{a: 4}, {a: 5}]); + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 14}]}, {a: 15}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 14})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 2})); + assert.commandWorked(collection.update({a: 15}, {a: 2})); + }), + [{a: 16}, {a: 17}]); ResumableIndexBuildTest.runResumeInterruptedByShutdown( rst, @@ -74,8 +79,15 @@ if (columnstoreEnabled) { "resumable_index_build4", // index name {name: "hangIndexBuildDuringBulkLoadPhase", logIdWithIndexName: 4924400}, "bulk load", - {a: [11, 22, 33]}, // initial doc - [], + {a: [44, 55, 66]}, // initial doc + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 77}]}, {a: 88}])); + assert.commandWorked(collection.update({a: [44, 55, 66]}, {a: [55, 66]})); + assert.commandWorked(collection.remove({"a.b": 77})); + assert.commandWorked(collection.insert({a: 99})); + assert.commandWorked(collection.remove({a: [55, 66]})); + assert.commandWorked(collection.update({a: 99}, {a: 1})); + }), [{a: 99}, {a: 100}]); } rst.stopSet(); diff --git a/jstests/noPassthrough/resumable_index_build_drain_writes_phase.js b/jstests/noPassthrough/resumable_index_build_drain_writes_phase.js index 088759a3a81..bf70ffffb1c 100644 --- a/jstests/noPassthrough/resumable_index_build_drain_writes_phase.js +++ b/jstests/noPassthrough/resumable_index_build_drain_writes_phase.js @@ -12,6 +12,7 @@ "use strict"; load("jstests/noPassthrough/libs/index_build.js"); +load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. const dbName = "test"; @@ -19,6 +20,9 @@ const rst = new ReplSetTest({nodes: 1}); rst.startSet(); rst.initiate(); +const columnstoreEnabled = checkSBEEnabled( + rst.getPrimary().getDB(dbName), ["featureFlagColumnstoreIndexes", "featureFlagSbeFull"], true); + const runTests = function(docs, indexSpecsFlat, sideWrites, collNameSuffix) { const coll = rst.getPrimary().getDB(dbName).getCollection(jsTestName() + collNameSuffix); assert.commandWorked(coll.insert(docs)); @@ -53,6 +57,18 @@ runTests({a: 1}, [{"$**": 1}, {h: 1}], [{a: [1, 2], b: {c: [3, 4]}, d: ""}, {e: "", f: [[]], g: null, h: 8}], "_wildcard"); - +if (columnstoreEnabled) { + runTests({a: 1}, + [{"$**": "columnstore"}, {h: 1}], + (function(collection) { + assert.commandWorked(collection.insert([{a: [{c: 2}], b: 2}, {a: 3, b: 3}])); + assert.commandWorked(collection.update({a: 3}, {a: 4, b: 3})); + assert.commandWorked(collection.remove({"a.c": 2})); + assert.commandWorked(collection.insert({a: 4, b: 4})); + assert.commandWorked(collection.remove({b: 3})); + assert.commandWorked(collection.update({a: 4}, {a: 2})); + }), + "_columnstore"); +} rst.stopSet(); })();
\ No newline at end of file diff --git a/jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js b/jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js index 8bc4c431246..8e6f529c8c4 100644 --- a/jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js +++ b/jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js @@ -13,6 +13,7 @@ "use strict"; load("jstests/noPassthrough/libs/index_build.js"); +load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. const dbName = "test"; const collName = jsTestName(); @@ -24,6 +25,9 @@ rst.initiate(); const primary = rst.getPrimary(); const coll = primary.getDB(dbName).getCollection(collName); +const columnstoreEnabled = checkSBEEnabled( + primary.getDB(dbName), ["featureFlagColumnstoreIndexes", "featureFlagSbeFull"], true); + assert.commandWorked(coll.insert({a: 1})); jsTestLog("Testing when primary shuts down in the middle of the first drain"); @@ -76,5 +80,83 @@ ResumableIndexBuildTest.runOnPrimaryToTestCommitQuorum( [{a: 14}, {a: 15}], [{a: 16}, {a: 17}]); +if (columnstoreEnabled) { + ResumableIndexBuildTest.run( + rst, + dbName, + collName, + [[{"$**": "columnstore"}]], + [{name: "hangIndexBuildDuringDrainWritesPhase", logIdWithIndexName: 4841800}], + 0, + ["drain writes"], + [{skippedPhaseLogID: 20392}], + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 10}]}, {a: 11}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 10})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 11}, {a: 1})); + }), + [{a: 12}, {a: 13}]); + ResumableIndexBuildTest.run( + rst, + dbName, + collName, + [[{"$**": "columnstore"}]], + [{name: "hangIndexBuildDuringDrainWritesPhase", logIdWithIndexName: 4841800}], + 1, + ["drain writes"], + [{skippedPhaseLogID: 20392}], + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 14}]}, {a: 15}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 14})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 15}, {a: 1})); + }), + [{a: 16}, {a: 17}]); + + jsTestLog("Testing when primary shuts down after voting, but before commit quorum satisfied"); + + ResumableIndexBuildTest.runOnPrimaryToTestCommitQuorum( + rst, + dbName, + collName, + {"$**": "columnstore"}, + "hangIndexBuildAfterSignalPrimaryForCommitReadiness", + "hangAfterIndexBuildFirstDrain", + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 22}]}, {a: 23}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 22})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 23}, {a: 1})); + }), + [{a: 24}, {a: 25}]); + + jsTestLog( + "Testing when primary shuts down after commit quorum satisfied, but before commitIndexBuild oplog entry written"); + + ResumableIndexBuildTest.runOnPrimaryToTestCommitQuorum( + rst, + dbName, + collName, + {"$**": "columnstore"}, + "hangIndexBuildAfterSignalPrimaryForCommitReadiness", + "hangIndexBuildAfterSignalPrimaryForCommitReadiness", + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 30}]}, {a: 31}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 30})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 31}, {a: 1})); + }), + [{a: 32}, {a: 33}]); +} + rst.stopSet(); })();
\ No newline at end of file diff --git a/jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js b/jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js index 1163d0664c4..04bc57ad2a8 100644 --- a/jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js +++ b/jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js @@ -16,6 +16,7 @@ "use strict"; load("jstests/noPassthrough/libs/index_build.js"); +load("jstests/libs/sbe_util.js"); // For checkSBEEnabled. const dbName = "test"; const collName = jsTestName(); @@ -32,6 +33,9 @@ rst.initiate(); let primary = rst.getPrimary(); let coll = primary.getDB(dbName).getCollection(collName); +const columnstoreEnabled = checkSBEEnabled( + primary.getDB(dbName), ["featureFlagColumnstoreIndexes", "featureFlagSbeFull"], true); + assert.commandWorked(coll.insert({a: 1})); jsTestLog("Testing when secondary shuts down in the middle of the first drain"); @@ -80,5 +84,84 @@ ResumableIndexBuildTest.runOnSecondary(rst, [{a: 14}, {a: 15}], [{a: 16}, {a: 17}]); +if (columnstoreEnabled) { + jsTestLog("Testing when secondary shuts down in the middle of the first drain"); + ResumableIndexBuildTest.runOnSecondary( + rst, + dbName, + collName, + {"$**": "columnstore"}, + "hangIndexBuildDuringDrainWritesPhase", + 0, + undefined, /* primaryFailPointName */ + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 10}]}, {a: 11}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 10})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 11}, {a: 1})); + }), + [{a: 12}, {a: 13}]); + ResumableIndexBuildTest.runOnSecondary( + rst, + dbName, + collName, + {"$**": "columnstore"}, + "hangIndexBuildDuringDrainWritesPhase", + 1, + undefined, /* primaryFailPointName */ + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 14}]}, {a: 15}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 14})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 15}, {a: 1})); + }), + [{a: 16}, {a: 17}]); + + jsTestLog("Testing when secondary shuts down before voting"); + + ResumableIndexBuildTest.runOnSecondary( + rst, + dbName, + collName, + {"$**": "columnstore"}, + "hangAfterIndexBuildFirstDrain", + {}, + undefined, /* primaryFailPointName */ + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 18}]}, {a: 19}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 18})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 19}, {a: 1})); + }), + [{a: 20}, {a: 21}]); + + jsTestLog( + "Testing when secondary shuts down after commit quorum satisfied, but before replicating commitIndexBuild oplog entry"); + + ResumableIndexBuildTest.runOnSecondary( + rst, + dbName, + collName, + {"$**": "columnstore"}, + "hangIndexBuildAfterSignalPrimaryForCommitReadiness", + {}, + "hangIndexBuildBeforeCommit", + (function(collection) { + assert.commandWorked(collection.insert([{a: [{b: 26}]}, {a: 27}])); + assert.commandWorked(collection.update({a: 1}, {a: 2})); + assert.commandWorked(collection.remove({"a.b": 26})); + assert.commandWorked(collection.insert({a: 1})); + assert.commandWorked(collection.remove({a: 1})); + assert.commandWorked(collection.update({a: 27}, {a: 1})); + }), + [{a: 28}, {a: 29}]); +} + rst.stopSet(); })();
\ No newline at end of file diff --git a/jstests/noPassthrough/resumable_index_build_mixed_phases.js b/jstests/noPassthrough/resumable_index_build_mixed_phases.js index b62da7449e6..0e165510e85 100644 --- a/jstests/noPassthrough/resumable_index_build_mixed_phases.js +++ b/jstests/noPassthrough/resumable_index_build_mixed_phases.js @@ -61,6 +61,15 @@ const runTests = function(failPoints, resumePhases, resumeChecks) { resumePhases, resumeChecks, "_wildcard"); + + if (columnstoreEnabled) { + runTest([{a: 1, b: 1}, {a: 2, b: 2}, {a: 3, b: 3}], + [[{"$**": "columnstore"}], [{b: 1}]], + failPoints, + resumePhases, + resumeChecks, + "_columnstore"); + } }; runTests( @@ -105,59 +114,5 @@ runTests( ], ["bulk load", "drain writes"], [{skippedPhaseLogID: 20391}, {skippedPhaseLogID: 20392}]); - -// TODO (SERVER-65978): Add sidewrites to tests and combine columnTests with normal runTests once -// side writes are implemented as the numbers for numScannedAfterResume will match -if (columnstoreEnabled) { - const runColumnTests = function(failPoints, resumePhases, resumeChecks) { - const docs = [{a: 1, b: 1}, {a: 2, b: 2}, {a: 3, b: 3}]; - const coll = rst.getPrimary().getDB(dbName).getCollection( - jsTestName() + "_" + resumePhases[0].replace(" ", "_") + "_" + - resumePhases[1].replace(" ", "_") + "_columnstore"); - assert.commandWorked(coll.insert(docs)); - - ResumableIndexBuildTest.run(rst, - dbName, - coll.getName(), - [[{b: 1}], [{"$**": "columnstore"}]], - failPoints, - 1, - resumePhases, - resumeChecks, - [], - [{a: 7, b: 7}, {a: 8, b: 8}, {a: 9, b: 9}]); - }; - - runColumnTests( - [ - {name: "hangIndexBuildBeforeWaitingUntilMajorityOpTime", logIdWithBuildUUID: 4940901}, - { - name: "hangIndexBuildDuringCollectionScanPhaseBeforeInsertion", - logIdWithBuildUUID: 20386 - } - ], - ["initialized", "collection scan"], - [{numScannedAfterResume: 3}, {numScannedAfterResume: 2}]); - - runColumnTests( - [ - {name: "hangIndexBuildBeforeWaitingUntilMajorityOpTime", logIdWithBuildUUID: 4940901}, - {name: "hangIndexBuildDuringBulkLoadPhase", logIdWithIndexName: 4924400} - - ], - ["initialized", "bulk load"], - [{numScannedAfterResume: 3}, {skippedPhaseLogID: 20391}]); - - runColumnTests( - [ - { - name: "hangIndexBuildDuringCollectionScanPhaseBeforeInsertion", - logIdWithBuildUUID: 20386 - }, - {name: "hangIndexBuildDuringBulkLoadPhase", logIdWithIndexName: 4924400} - ], - ["collection scan", "bulk load"], - [{numScannedAfterResume: 2}, {skippedPhaseLogID: 20391}]); -} rst.stopSet(); -})(); +})();
\ No newline at end of file diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript index 32a10b4eee7..d4fc11138c6 100644 --- a/src/mongo/db/index/SConscript +++ b/src/mongo/db/index/SConscript @@ -154,6 +154,7 @@ env.Library( '$BUILD_DIR/mongo/db/fts/base_fts', '$BUILD_DIR/mongo/db/resumable_index_builds_idl', '$BUILD_DIR/mongo/db/server_base', + '$BUILD_DIR/mongo/db/storage/execution_context', 'column_store_index', 'expression_params', 'key_generator', diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp index ef03980b917..55e6c9ad03a 100644 --- a/src/mongo/db/index/columns_access_method.cpp +++ b/src/mongo/db/index/columns_access_method.cpp @@ -41,7 +41,9 @@ #include "mongo/db/index/column_cell.h" #include "mongo/db/index/column_key_generator.h" #include "mongo/db/index/column_store_sorter.h" +#include "mongo/db/index/index_build_interceptor.h" #include "mongo/db/index/index_descriptor.h" +#include "mongo/db/storage/execution_context.h" #include "mongo/logv2/log.h" #include "mongo/util/progress_meter.h" @@ -54,6 +56,11 @@ inline void inc(int64_t* counter) { if (counter) ++*counter; }; + +inline void dec(int64_t* counter) { + if (counter) + --*counter; +}; } // namespace ColumnStoreAccessMethod::ColumnStoreAccessMethod(IndexCatalogEntry* ice, @@ -260,6 +267,26 @@ Status ColumnStoreAccessMethod::BulkBuilder::keyCommitted( return Status::OK(); } + +void ColumnStoreAccessMethod::_visitCellsForIndexInsert( + OperationContext* opCtx, + PooledFragmentBuilder& buf, + const std::vector<BsonRecord>& bsonRecords, + function_ref<void(StringData, const BsonRecord&)> cb) const { + _keyGen.visitCellsForInsert( + bsonRecords, + [&](StringData path, const BsonRecord& rec, const column_keygen::UnencodedCellView& cell) { + if (!rec.ts.isNull()) { + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts)); + } + buf.reset(); + column_keygen::writeEncodedCell(cell, &buf); + tassert( + 6597800, "RecordID cannot be a string for column store indexes", !rec.id.isStr()); + cb(path, rec); + }); +} + Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, SharedBufferFragmentBuilder& pooledBufferBuilder, const CollectionPtr& coll, @@ -268,24 +295,33 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx, int64_t* keysInsertedOut) { try { PooledFragmentBuilder buf(pooledBufferBuilder); - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitCellsForInsert( - bsonRecords, - [&](StringData path, - const BsonRecord& rec, - const column_keygen::UnencodedCellView& cell) { - if (!rec.ts.isNull()) { - uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts)); + // We cannot write to the index during its initial build phase, so we defer this insert as a + // "side write" to be applied after the build completes. + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _visitCellsForIndexInsert( + opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) { + columnKeys->emplace_back( + path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rec.id); + }); + int64_t inserted = 0; + ON_BLOCK_EXIT([keysInsertedOut, inserted] { + if (keysInsertedOut) { + *keysInsertedOut += inserted; } - - buf.reset(); - column_keygen::writeEncodedCell(cell, &buf); - invariant(!rec.id.isStr()); - cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())}); - - inc(keysInsertedOut); }); - return Status::OK(); + uassertStatusOK(_indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kInsert, &inserted)); + return Status::OK(); + } else { + auto cursor = _store->newWriteCursor(opCtx); + _visitCellsForIndexInsert( + opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) { + cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())}); + inc(keysInsertedOut); + }); + return Status::OK(); + } } catch (const AssertionException& ex) { return ex.toStatus(); } @@ -300,12 +336,26 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx, const InsertDeleteOptions& options, int64_t* keysDeletedOut, CheckRecordId checkRecordId) { - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitPathsForDelete(obj, [&](StringData path) { - tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr()); - cursor->remove(path, rid.getLong()); - inc(keysDeletedOut); - }); + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _keyGen.visitPathsForDelete(obj, [&](StringData path) { + columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid)); + }); + int64_t removed = 0; + fassert(6597801, + _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed)); + if (keysDeletedOut) { + *keysDeletedOut += removed; + } + } else { + auto cursor = _store->newWriteCursor(opCtx); + _keyGen.visitPathsForDelete(obj, [&](PathView path) { + tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr()); + cursor->remove(path, rid.getLong()); + inc(keysDeletedOut); + }); + } } Status ColumnStoreAccessMethod::update(OperationContext* opCtx, @@ -318,37 +368,88 @@ Status ColumnStoreAccessMethod::update(OperationContext* opCtx, int64_t* keysInsertedOut, int64_t* keysDeletedOut) { PooledFragmentBuilder buf(pooledBufferBuilder); - auto cursor = _store->newWriteCursor(opCtx); - _keyGen.visitDiffForUpdate( - oldDoc, - newDoc, - [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, - StringData path, - const column_keygen::UnencodedCellView* cell) { - if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { - tassert( - 6762302, "RecordID cannot be a string for column store indexes", !rid.isStr()); - cursor->remove(path, rid.getLong()); - inc(keysDeletedOut); - return; - } - // kInsert and kUpdate are handled almost identically. If we switch to using - // `overwrite=true` cursors in WT, we could consider making them the same, although that - // might disadvantage other implementations of the storage engine API. - buf.reset(); - column_keygen::writeEncodedCell(*cell, &buf); + if (_indexCatalogEntry->isHybridBuilding()) { + auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys(); + _keyGen.visitDiffForUpdate( + oldDoc, + newDoc, + [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, + StringData path, + const column_keygen::UnencodedCellView* cell) { + if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { + columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid)); + int64_t removed = 0; + fassert(6597802, + _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed)); + + if (keysDeletedOut) { + *keysDeletedOut += removed; + } + return; + } + + // kInsert and kUpdate are handled almost identically. If we switch to using + // `overwrite=true` cursors in WT, we could consider making them the same, + // although that might disadvantage other implementations of the storage engine + // API. + buf.reset(); + column_keygen::writeEncodedCell(*cell, &buf); - const auto method = diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert - ? &ColumnStore::WriteCursor::insert - : &ColumnStore::WriteCursor::update; - tassert(6762303, "RecordID cannot be a string for column store indexes", !rid.isStr()); - (cursor.get()->*method)(path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())}); + const auto method = + diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert + ? IndexBuildInterceptor::Op::kInsert + : IndexBuildInterceptor::Op::kUpdate; - inc(keysInsertedOut); - }); + columnKeys->emplace_back(std::make_tuple( + path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rid)); + + int64_t inserted = 0; + Status status = _indexCatalogEntry->indexBuildInterceptor()->sideWrite( + opCtx, *columnKeys, method, &inserted); + if (keysInsertedOut) { + *keysInsertedOut += inserted; + } + }); + + } else { + auto cursor = _store->newWriteCursor(opCtx); + _keyGen.visitDiffForUpdate( + oldDoc, + newDoc, + [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction, + StringData path, + const column_keygen::UnencodedCellView* cell) { + if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) { + tassert(6762302, + "RecordID cannot be a string for column store indexes", + !rid.isStr()); + cursor->remove(path, rid.getLong()); + inc(keysDeletedOut); + return; + } + + // kInsert and kUpdate are handled almost identically. If we switch to using + // `overwrite=true` cursors in WT, we could consider making them the same, although + // that might disadvantage other implementations of the storage engine API. + buf.reset(); + column_keygen::writeEncodedCell(*cell, &buf); + + const auto method = + diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert + ? &ColumnStore::WriteCursor::insert + : &ColumnStore::WriteCursor::update; + tassert( + 6762303, "RecordID cannot be a string for column store indexes", !rid.isStr()); + (cursor.get()->*method)( + path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())}); + + inc(keysInsertedOut); + }); + } return Status::OK(); -} +} // namespace mongo Status ColumnStoreAccessMethod::initializeAsEmpty(OperationContext* opCtx) { return Status::OK(); @@ -383,8 +484,9 @@ std::unique_ptr<IndexAccessMethod::BulkBuilder> ColumnStoreAccessMethod::initiat size_t maxMemoryUsageBytes, const boost::optional<IndexStateInfo>& stateInfo, StringData dbName) { - return stateInfo ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName) - : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName); + return (stateInfo && stateInfo->getFileName()) + ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName) + : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName); } std::shared_ptr<Ident> ColumnStoreAccessMethod::getSharedIdent() const { @@ -395,4 +497,41 @@ void ColumnStoreAccessMethod::setIdent(std::shared_ptr<Ident> ident) { _store->setIdent(std::move(ident)); } +void ColumnStoreAccessMethod::applyColumnDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + int64_t* keysInserted, + int64_t* keysDeleted) { + const IndexBuildInterceptor::Op opType = operation.getStringField("op") == "i"_sd + ? IndexBuildInterceptor::Op::kInsert + : operation.getStringField("op") == "d"_sd ? IndexBuildInterceptor::Op::kDelete + : IndexBuildInterceptor::Op::kUpdate; + + RecordId rid = RecordId::deserializeToken(operation.getField("rid")); + + CellView cell = operation.getStringField("cell"); + PathView path = operation.getStringField("path"); + + auto cursor = _store->newWriteCursor(opCtx); + + tassert(6597803, "RecordID cannot be a string for column store indexes", !rid.isStr()); + switch (opType) { + case IndexBuildInterceptor::Op::kInsert: + cursor->insert(path, rid.getLong(), cell); + inc(keysInserted); + opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); }); + break; + case IndexBuildInterceptor::Op::kDelete: + cursor->remove(path, rid.getLong()); + inc(keysDeleted); + opCtx->recoveryUnit()->onRollback([keysDeleted] { dec(keysDeleted); }); + break; + case IndexBuildInterceptor::Op::kUpdate: + cursor->update(path, rid.getLong(), cell); + inc(keysInserted); + opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); }); + break; + } +} + } // namespace mongo diff --git a/src/mongo/db/index/columns_access_method.h b/src/mongo/db/index/columns_access_method.h index fefa5240468..44848fa114c 100644 --- a/src/mongo/db/index/columns_access_method.h +++ b/src/mongo/db/index/columns_access_method.h @@ -75,6 +75,7 @@ public: const InsertDeleteOptions& options, int64_t* keysDeletedOut, CheckRecordId checkRecordId) final; + Status update(OperationContext* opCtx, SharedBufferFragmentBuilder& pooledBufferBuilder, const BSONObj& oldDoc, @@ -85,6 +86,12 @@ public: int64_t* keysInsertedOut, int64_t* keysDeletedOut) final; + void applyColumnDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + int64_t* keysInserted, + int64_t* keysDeleted) final; + Status initializeAsEmpty(OperationContext* opCtx) final; void validate(OperationContext* opCtx, @@ -117,6 +124,11 @@ public: class BulkBuilder; private: + void _visitCellsForIndexInsert(OperationContext* opCtx, + PooledFragmentBuilder& pooledFragmentBuilder, + const std::vector<BsonRecord>& bsonRecords, + function_ref<void(StringData, const BsonRecord&)> cb) const; + const std::unique_ptr<ColumnStore> _store; IndexCatalogEntry* const _indexCatalogEntry; // owned by IndexCatalog const IndexDescriptor* const _descriptor; diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp index db5a1974aa1..7e7bc6a7c89 100644 --- a/src/mongo/db/index/index_access_method.cpp +++ b/src/mongo/db/index/index_access_method.cpp @@ -620,6 +620,66 @@ void SortedDataIndexAccessMethod::setIdent(std::shared_ptr<Ident> newIdent) { this->_newInterface->setIdent(std::move(newIdent)); } +Status SortedDataIndexAccessMethod::applySortedDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + const InsertDeleteOptions& options, + KeyHandlerFn&& onDuplicateKey, + int64_t* const keysInserted, + int64_t* const keysDeleted) { + auto opType = [&operation] { + switch (operation.getStringField("op")[0]) { + case 'i': + return IndexBuildInterceptor::Op::kInsert; + case 'd': + return IndexBuildInterceptor::Op::kDelete; + case 'u': + return IndexBuildInterceptor::Op::kUpdate; + default: + MONGO_UNREACHABLE; + } + }(); + + // Deserialize the encoded KeyString::Value. + int keyLen; + const char* binKey = operation["key"].binData(keyLen); + BufReader reader(binKey, keyLen); + const KeyString::Value keyString = + KeyString::Value::deserialize(reader, getSortedDataInterface()->getKeyStringVersion()); + + const KeyStringSet keySet{keyString}; + if (opType == IndexBuildInterceptor::Op::kInsert) { + int64_t numInserted; + auto status = insertKeysAndUpdateMultikeyPaths(opCtx, + coll, + {keySet.begin(), keySet.end()}, + {}, + MultikeyPaths{}, + options, + std::move(onDuplicateKey), + &numInserted); + if (!status.isOK()) { + return status; + } + + *keysInserted += numInserted; + opCtx->recoveryUnit()->onRollback( + [keysInserted, numInserted] { *keysInserted -= numInserted; }); + } else { + invariant(opType == IndexBuildInterceptor::Op::kDelete); + int64_t numDeleted; + Status s = removeKeys(opCtx, {keySet.begin(), keySet.end()}, options, &numDeleted); + if (!s.isOK()) { + return s; + } + + *keysDeleted += numDeleted; + opCtx->recoveryUnit()->onRollback( + [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; }); + } + return Status::OK(); +} + void IndexAccessMethod::BulkBuilder::countNewBuildInStats() { indexBulkBuilderSSS.count.addAndFetch(1); } @@ -1206,7 +1266,6 @@ void SortedDataIndexAccessMethod::_unindexKeysOrWriteToSideTable( if (!status.isOK()) { LOGV2(20362, - "Couldn't unindex record {obj} from collection {namespace}: {error}", "Couldn't unindex record", "record"_attr = redact(obj), "namespace"_attr = ns, diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h index 8c565152b67..40e012aa238 100644 --- a/src/mongo/db/index/index_access_method.h +++ b/src/mongo/db/index/index_access_method.h @@ -174,6 +174,24 @@ public: */ virtual void setIdent(std::shared_ptr<Ident> newIdent) = 0; + virtual Status applySortedDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + const InsertDeleteOptions& options, + KeyHandlerFn&& onDuplicateKey, + int64_t* const keysInserted, + int64_t* const keysDeleted) { + MONGO_UNREACHABLE; + }; + + virtual void applyColumnDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + int64_t* keysInserted, + int64_t* keysDeleted) { + MONGO_UNREACHABLE; + }; + // // Bulk operations support // @@ -561,6 +579,14 @@ public: void setIdent(std::shared_ptr<Ident> newIdent) final; + Status applySortedDataSideWrite(OperationContext* opCtx, + const CollectionPtr& coll, + const BSONObj& operation, + const InsertDeleteOptions& options, + KeyHandlerFn&& onDuplicateKey, + int64_t* keysInserted, + int64_t* keysDeleted) final; + std::unique_ptr<BulkBuilder> initiateBulk(size_t maxMemoryUsageBytes, const boost::optional<IndexStateInfo>& stateInfo, StringData dbName) final; diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp index 36072c25bf5..872c5301d85 100644 --- a/src/mongo/db/index/index_build_interceptor.cpp +++ b/src/mongo/db/index/index_build_interceptor.cpp @@ -39,6 +39,7 @@ #include "mongo/db/concurrency/exception_util.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" +#include "mongo/db/index/columns_access_method.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_build_interceptor_gen.h" #include "mongo/db/multi_key_path_tracker.h" @@ -291,56 +292,25 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, TrackDuplicates trackDups, int64_t* const keysInserted, int64_t* const keysDeleted) { - // Deserialize the encoded KeyString::Value. - int keyLen; - const char* binKey = operation["key"].binData(keyLen); - BufReader reader(binKey, keyLen); - auto accessMethod = _indexCatalogEntry->accessMethod()->asSortedData(); - const KeyString::Value keyString = KeyString::Value::deserialize( - reader, accessMethod->getSortedDataInterface()->getKeyStringVersion()); - - const Op opType = operation.getStringField("op") == "i"_sd ? Op::kInsert : Op::kDelete; - - const KeyStringSet keySet{keyString}; - if (opType == Op::kInsert) { - int64_t numInserted; - auto status = accessMethod->insertKeysAndUpdateMultikeyPaths( + // Check field for "key" to determine if collection is sorted data or column store. + if (operation.hasField("key")) { + return _indexCatalogEntry->accessMethod()->applySortedDataSideWrite( opCtx, coll, - {keySet.begin(), keySet.end()}, - {}, - MultikeyPaths{}, + operation, options, [=](const KeyString::Value& duplicateKey) { return trackDups == TrackDuplicates::kTrack ? recordDuplicateKey(opCtx, duplicateKey) : Status::OK(); }, - &numInserted); - if (!status.isOK()) { - return status; - } - - *keysInserted += numInserted; - opCtx->recoveryUnit()->onRollback( - [keysInserted, numInserted] { *keysInserted -= numInserted; }); + keysInserted, + keysDeleted); } else { - invariant(opType == Op::kDelete); - if (kDebugBuild) - invariant(operation.getStringField("op") == "d"_sd); - - int64_t numDeleted; - Status s = - accessMethod->removeKeys(opCtx, {keySet.begin(), keySet.end()}, options, &numDeleted); - if (!s.isOK()) { - return s; - } - - *keysDeleted += numDeleted; - opCtx->recoveryUnit()->onRollback( - [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; }); + _indexCatalogEntry->accessMethod()->applyColumnDataSideWrite( + opCtx, coll, operation, keysInserted, keysDeleted); + return Status::OK(); } - return Status::OK(); } void IndexBuildInterceptor::_yield(OperationContext* opCtx, const Yieldable* yieldable) { @@ -422,6 +392,35 @@ boost::optional<MultikeyPaths> IndexBuildInterceptor::getMultikeyPaths() const { return _multikeyPaths; } +Status IndexBuildInterceptor::_finishSideWrite(OperationContext* opCtx, + const std::vector<BSONObj>& toInsert) { + _sideWritesCounter->fetchAndAdd(toInsert.size()); + // This insert may roll back, but not necessarily from inserting into this table. If other write + // operations outside this table and in the same transaction are rolled back, this counter also + // needs to be rolled back. + opCtx->recoveryUnit()->onRollback([sharedCounter = _sideWritesCounter, size = toInsert.size()] { + sharedCounter->fetchAndSubtract(size); + }); + + std::vector<Record> records; + for (auto& doc : toInsert) { + records.emplace_back(Record{RecordId(), // The storage engine will assign its own RecordId + // when we pass one that is null. + RecordData(doc.objdata(), doc.objsize())}); + } + + LOGV2_DEBUG(20691, + 2, + "Recording side write keys on index", + "numRecords"_attr = records.size(), + "index"_attr = _indexCatalogEntry->descriptor()->indexName()); + + // By passing a vector of null timestamps, these inserts are not timestamped individually, but + // rather with the timestamp of the owning operation. + std::vector<Timestamp> timestamps(records.size()); + return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps); +} + Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, const KeyStringSet& keys, const KeyStringSet& multikeyMetadataKeys, @@ -429,6 +428,7 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, Op op, int64_t* const numKeysOut) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); + invariant(op != IndexBuildInterceptor::Op::kUpdate); // Maintain parity with IndexAccessMethods handling of key counting. Only include // `multikeyMetadataKeys` when inserting. @@ -478,9 +478,9 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, } if (op == Op::kInsert) { - // Wildcard indexes write multikey path information, typically part of the catalog - // document, to the index itself. Multikey information is never deleted, so we only need - // to add this data on the insert path. + // Wildcard indexes write multikey path information, typically part of the catalog document, + // to the index itself. Multikey information is never deleted, so we only need to add this + // data on the insert path. for (const auto& keyString : multikeyMetadataKeys) { builder.reset(); keyString.serialize(builder); @@ -491,33 +491,41 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, } } - _sideWritesCounter->fetchAndAdd(toInsert.size()); - // This insert may roll back, but not necessarily from inserting into this table. If other write - // operations outside this table and in the same transaction are rolled back, this counter also - // needs to be rolled back. - opCtx->recoveryUnit()->onRollback([sharedCounter = _sideWritesCounter, size = toInsert.size()] { - sharedCounter->fetchAndSubtract(size); - }); + return _finishSideWrite(opCtx, std::move(toInsert)); +} - std::vector<Record> records; - for (auto& doc : toInsert) { - records.emplace_back(Record{RecordId(), // The storage engine will assign its own RecordId - // when we pass one that is null. - RecordData(doc.objdata(), doc.objsize())}); - } +Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, + const PathCellSet& keys, + Op op, + int64_t* const numKeysOut) { + invariant(opCtx->lockState()->inAWriteUnitOfWork()); - LOGV2_DEBUG(20691, - 2, - "recording {records_size} side write keys on index " - "'{indexCatalogEntry_descriptor_indexName}'", - "records_size"_attr = records.size(), - "indexCatalogEntry_descriptor_indexName"_attr = - _indexCatalogEntry->descriptor()->indexName()); + *numKeysOut = keys.size(); - // By passing a vector of null timestamps, these inserts are not timestamped individually, but - // rather with the timestamp of the owning operation. - std::vector<Timestamp> timestamps(records.size()); - return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps); + std::vector<BSONObj> toInsert; + toInsert.reserve(keys.size()); + for (const auto& [path, cell, rid] : keys) { + + BSONObjBuilder builder; + rid.serializeToken("rid", &builder); + builder.append("op", [op] { + switch (op) { + case Op::kInsert: + return "i"; + case Op::kDelete: + return "d"; + case Op::kUpdate: + return "u"; + } + MONGO_UNREACHABLE; + }()); + builder.append("path", path); + builder.append("cell", cell); + + toInsert.push_back(builder.obj()); + } + + return _finishSideWrite(opCtx, std::move(toInsert)); } Status IndexBuildInterceptor::retrySkippedRecords(OperationContext* opCtx, diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h index 46c4f5e6e8b..b1888d46f76 100644 --- a/src/mongo/db/index/index_build_interceptor.h +++ b/src/mongo/db/index/index_build_interceptor.h @@ -31,11 +31,13 @@ #include <memory> +#include "mongo/db/index/columns_access_method.h" #include "mongo/db/index/duplicate_key_tracker.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/multikey_paths.h" #include "mongo/db/index/skipped_record_tracker.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/storage/column_store.h" #include "mongo/db/storage/temporary_record_store.h" #include "mongo/db/yieldable.h" #include "mongo/platform/atomic_word.h" @@ -53,7 +55,7 @@ public: */ enum class DrainYieldPolicy { kNoYield, kYield }; - enum class Op { kInsert, kDelete }; + enum class Op { kInsert, kDelete, kUpdate }; /** * Indicates whether to record duplicate keys that have been inserted into the index. When set @@ -101,6 +103,18 @@ public: int64_t* numKeysOut); /** + * Client writes that are concurrent with a column store index build will have their index + * updates written to a temporary table. After the index table scan is complete, these updates + * will be applied to the underlying index table. + * + * On success, `numKeysOut` if non-null will contain the number of keys added or removed. + */ + Status sideWrite(OperationContext* opCtx, + const PathCellSet& columnstoreKeys, + Op op, + int64_t* numKeysOut); + + /** * Given a duplicate key, record the key for later verification by a call to * checkDuplicateKeyConstraints(); */ @@ -173,7 +187,6 @@ public: private: using SideWriteRecord = std::pair<RecordId, BSONObj>; - Status _applyWrite(OperationContext* opCtx, const CollectionPtr& coll, const BSONObj& doc, @@ -193,6 +206,8 @@ private: FailPoint* fp, long long iteration) const; + Status _finishSideWrite(OperationContext* opCtx, const std::vector<BSONObj>& toInsert); + // The entry for the index that is being built. const IndexCatalogEntry* _indexCatalogEntry; @@ -224,5 +239,4 @@ private: MONGO_MAKE_LATCH("IndexBuildInterceptor::_multikeyPathMutex"); boost::optional<MultikeyPaths> _multikeyPaths; }; - } // namespace mongo diff --git a/src/mongo/db/index/index_build_interceptor_test.cpp b/src/mongo/db/index/index_build_interceptor_test.cpp index 30f996d69d2..e0f9f86c98a 100644 --- a/src/mongo/db/index/index_build_interceptor_test.cpp +++ b/src/mongo/db/index/index_build_interceptor_test.cpp @@ -30,6 +30,7 @@ #include "mongo/db/catalog/catalog_test_fixture.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/index/index_build_interceptor.h" +#include "mongo/idl/server_parameter_test_util.h" namespace mongo { namespace { @@ -118,5 +119,238 @@ TEST_F(IndexBuilderInterceptorTest, SingleInsertIsSavedToSideWritesTable) { << "key" << serializedKeyString), sideWrites[0]); } + + +TEST_F(IndexBuilderInterceptorTest, SingleColumnInsertIsSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1))); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys)); + ASSERT_EQ(1, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem = obj["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(1, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op" + << "i" + << "path" + << "path" + << "cell" + << "cell"), + sideWrites[0]); +} + +TEST_F(IndexBuilderInterceptorTest, SingleColumnDeleteIsSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "", RecordId(1))); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kDelete, &numKeys)); + ASSERT_EQ(1, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem = obj["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(1, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op" + << "d" + << "path" + << "path" + << "cell" + << ""), + sideWrites[0]); +} + +TEST_F(IndexBuilderInterceptorTest, SingleColumnUpdateIsSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + // create path + cell + rid + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1))); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kUpdate, &numKeys)); + ASSERT_EQ(1, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem = obj["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(1, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op" + << "u" + << "path" + << "path" + << "cell" + << "cell"), + sideWrites[0]); +} + +TEST_F(IndexBuilderInterceptorTest, MultipleColumnInsertsAreSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1))); + columnKeys.emplace_back(std::make_tuple("path1", "cell1", RecordId(1))); + columnKeys.emplace_back(std::make_tuple("path2", "cell2", RecordId(2))); + columnKeys.emplace_back(std::make_tuple("path3", "cell3", RecordId(2))); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys)); + ASSERT_EQ(4, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem1 = obj["rid"]; + + BSONObjBuilder builder2; + RecordId(2).serializeToken("rid", &builder2); + BSONObj obj2 = builder2.obj(); + BSONElement elem2 = obj2["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(4, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op" + << "i" + << "path" + << "path" + << "cell" + << "cell"), + sideWrites[0]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op" + << "i" + << "path" + << "path1" + << "cell" + << "cell1"), + sideWrites[1]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op" + << "i" + << "path" + << "path2" + << "cell" + << "cell2"), + sideWrites[2]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op" + << "i" + << "path" + << "path3" + << "cell" + << "cell3"), + sideWrites[3]); +} + +TEST_F(IndexBuilderInterceptorTest, MultipleColumnSideWritesAreSavedToSideWritesTable) { + RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true); + auto interceptor = createIndexBuildInterceptor( + fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}")); + + WriteUnitOfWork wuow(operationContext()); + int64_t numKeys = 0; + + PathCellSet columnKeys; + columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1))); + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys)); + ASSERT_EQ(1, numKeys); + + PathCellSet columnKeys2; + columnKeys2.emplace_back(std::make_tuple("path", "", RecordId(1))); + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys2, IndexBuildInterceptor::Op::kDelete, &numKeys)); + ASSERT_EQ(1, numKeys); + + + PathCellSet columnKeys3; + columnKeys3.emplace_back(std::make_tuple("path1", "cell1", RecordId(2))); + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys3, IndexBuildInterceptor::Op::kUpdate, &numKeys)); + ASSERT_EQ(1, numKeys); + + PathCellSet columnKeys4; + columnKeys4.emplace_back(std::make_tuple("path2", "cell2", RecordId(2))); + ASSERT_OK(interceptor->sideWrite( + operationContext(), columnKeys4, IndexBuildInterceptor::Op::kInsert, &numKeys)); + ASSERT_EQ(1, numKeys); + wuow.commit(); + + BSONObjBuilder builder; + RecordId(1).serializeToken("rid", &builder); + BSONObj obj = builder.obj(); + BSONElement elem1 = obj["rid"]; + + BSONObjBuilder builder2; + RecordId(2).serializeToken("rid", &builder2); + BSONObj obj2 = builder2.obj(); + BSONElement elem2 = obj2["rid"]; + + auto sideWrites = getSideWritesTableContents(std::move(interceptor)); + ASSERT_EQ(4, sideWrites.size()); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op" + << "i" + << "path" + << "path" + << "cell" + << "cell"), + sideWrites[0]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op" + << "d" + << "path" + << "path" + << "cell" + << ""), + sideWrites[1]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op" + << "u" + << "path" + << "path1" + << "cell" + << "cell1"), + sideWrites[2]); + ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op" + << "i" + << "path" + << "path2" + << "cell" + << "cell2"), + sideWrites[3]); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/storage/column_store.h b/src/mongo/db/storage/column_store.h index 9db8ca7d1bb..30b21cbe340 100644 --- a/src/mongo/db/storage/column_store.h +++ b/src/mongo/db/storage/column_store.h @@ -786,4 +786,6 @@ struct SplitCellView { } } }; + +using PathCellSet = std::vector<std::tuple<std::string, std::string, RecordId>>; } // namespace mongo diff --git a/src/mongo/db/storage/execution_context.h b/src/mongo/db/storage/execution_context.h index 28479f833b9..4ceed7ec4a3 100644 --- a/src/mongo/db/storage/execution_context.h +++ b/src/mongo/db/storage/execution_context.h @@ -31,6 +31,7 @@ #include "mongo/db/index/multikey_paths.h" #include "mongo/db/operation_context.h" +#include "mongo/db/storage/column_store.h" #include "mongo/db/storage/key_string.h" #include "mongo/util/auto_clear_ptr.h" @@ -61,11 +62,15 @@ public: AutoClearPtr<MultikeyPaths> multikeyPaths() { return makeAutoClearPtr(&_multikeyPaths); } + AutoClearPtr<PathCellSet> columnKeys() { + return makeAutoClearPtr(&_columnKeys); + } private: KeyStringSet _keys; KeyStringSet _multikeyMetadataKeys; MultikeyPaths _multikeyPaths; + PathCellSet _columnKeys; }; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp index 1caf0b67158..9f0561911b8 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp @@ -175,7 +175,6 @@ void WiredTigerColumnStore::WriteCursor::insert(PathView path, RowId rid, CellVi auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(c()->uri, keyItem.size); - // TODO: SERVER-65978, we may have to specially handle WT_DUPLICATE_KEY error here. if (ret) { uassertStatusOK(wtRCToStatus(ret, c()->session)); } @@ -219,7 +218,6 @@ void WiredTigerColumnStore::WriteCursor::update(PathView path, RowId rid, CellVi auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx); metricsCollector.incrementOneIdxEntryWritten(c()->uri, keyItem.size); - // TODO: SERVER-65978, may want to handle WT_NOTFOUND specially. if (ret != 0) return uassertStatusOK(wtRCToStatus(ret, c()->session)); } |