summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Buerger <will.buerger@mongodb.com>2022-09-30 20:59:42 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-30 22:02:57 +0000
commit88203efd43517720245e920a5d23bf4f0aeb35b0 (patch)
tree676fa01fb5ad55ec1b654dacf1e3d0548e7b1c01
parent29c5c67ac1e47a716717c9a0e5e398865c789873 (diff)
downloadmongo-88203efd43517720245e920a5d23bf4f0aeb35b0.tar.gz
SERVER-65978: Online column store index builds
Co-authored-by: Erin Zhu <erin.zhu@mongodb.com>
-rw-r--r--jstests/noPassthrough/libs/index_build.js14
-rw-r--r--jstests/noPassthrough/restart_index_build_if_resume_fails.js70
-rw-r--r--jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js24
-rw-r--r--jstests/noPassthrough/resumable_index_build_drain_writes_phase.js18
-rw-r--r--jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js82
-rw-r--r--jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js83
-rw-r--r--jstests/noPassthrough/resumable_index_build_mixed_phases.js65
-rw-r--r--src/mongo/db/index/SConscript1
-rw-r--r--src/mongo/db/index/columns_access_method.cpp241
-rw-r--r--src/mongo/db/index/columns_access_method.h12
-rw-r--r--src/mongo/db/index/index_access_method.cpp61
-rw-r--r--src/mongo/db/index/index_access_method.h26
-rw-r--r--src/mongo/db/index/index_build_interceptor.cpp142
-rw-r--r--src/mongo/db/index/index_build_interceptor.h20
-rw-r--r--src/mongo/db/index/index_build_interceptor_test.cpp234
-rw-r--r--src/mongo/db/storage/column_store.h2
-rw-r--r--src/mongo/db/storage/execution_context.h5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp2
18 files changed, 889 insertions, 213 deletions
diff --git a/jstests/noPassthrough/libs/index_build.js b/jstests/noPassthrough/libs/index_build.js
index 4e4775cb64d..6577df94016 100644
--- a/jstests/noPassthrough/libs/index_build.js
+++ b/jstests/noPassthrough/libs/index_build.js
@@ -358,8 +358,8 @@ const ResumableIndexBuildTest = class {
}
/**
- * Runs createIndexFn in a parellel shell to create indexes, inserting the documents specified
- * by sideWrites into the side writes table.
+ * Runs createIndexFn in a parellel shell to create indexes, modifying the collection with the
+ * side writes table.
*
* 'createIndexFn' should take three parameters: collection name, index specifications, and
* index names.
@@ -368,6 +368,10 @@ const ResumableIndexBuildTest = class {
* is [[{a: 1}, {b: 1}], [{c: 1}]], a valid indexNames would look like
* [["index_1", "index_2"], ["index_3"]].
*
+ * 'sideWrites' can be an array specifying documents to be inserted into the side writes table,
+ * or a function that performs any series of operations (inserts, deletes, or updates) with the
+ * side writes table
+ *
* If {hangBeforeBuildingIndex: true}, returns with the hangBeforeBuildingIndex failpoint
* enabled and the index builds hanging at this point.
*/
@@ -411,7 +415,11 @@ const ResumableIndexBuildTest = class {
});
}
- assert.commandWorked(coll.insert(sideWrites));
+ if (Array.isArray(sideWrites)) {
+ assert.commandWorked(coll.insert(sideWrites));
+ } else {
+ sideWrites(coll);
+ }
// Before building the index, wait for the the last op to be committed so that establishing
// the majority read cursor does not race with step down.
diff --git a/jstests/noPassthrough/restart_index_build_if_resume_fails.js b/jstests/noPassthrough/restart_index_build_if_resume_fails.js
index e166465d2c6..0cee7035e63 100644
--- a/jstests/noPassthrough/restart_index_build_if_resume_fails.js
+++ b/jstests/noPassthrough/restart_index_build_if_resume_fails.js
@@ -55,33 +55,55 @@ ResumableIndexBuildTest.runFailToResume(rst,
[{a: 10}, {a: 11}],
[{a: 12}, {a: 13}]);
-// TODO (SERVER-65978): Add side writes to these test cases once they are supported by column store
-// index builds.
if (columnstoreEnabled) {
- ResumableIndexBuildTest.runFailToResume(rst,
- dbName,
- collName,
- {"$**": "columnstore"},
- {failPointAfterStartup: "failToParseResumeIndexInfo"},
- [],
- [{a: 4}, {a: 5}],
- true /* failWhileParsing */);
+ ResumableIndexBuildTest.runFailToResume(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ {failPointAfterStartup: "failToParseResumeIndexInfo"},
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 14}]}, {a: 15}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 14}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 15}, {a: 1}));
+ }),
+ [{a: 16}, {a: 17}],
+ true /* failWhileParsing */);
- ResumableIndexBuildTest.runFailToResume(rst,
- dbName,
- collName,
- {"$**": "columnstore"},
- {failPointAfterStartup: "failSetUpResumeIndexBuild"},
- [],
- [{a: 8}, {a: 9}]);
+ ResumableIndexBuildTest.runFailToResume(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ {failPointAfterStartup: "failSetUpResumeIndexBuild"},
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 18}]}, {a: 19}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 18}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 19}, {a: 1}));
+ }),
+ [{a: 20}, {a: 21}]);
- ResumableIndexBuildTest.runFailToResume(rst,
- dbName,
- collName,
- {"$**": "columnstore"},
- {removeTempFilesBeforeStartup: true},
- [],
- [{a: 12}, {a: 13}]);
+ ResumableIndexBuildTest.runFailToResume(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ {removeTempFilesBeforeStartup: true},
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 22}]}, {a: 23}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 22}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 23}, {a: 1}));
+ }),
+ [{a: 24}, {a: 25}]);
}
rst.stopSet();
diff --git a/jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js b/jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js
index a33f279e9a4..3a4a84880f6 100644
--- a/jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js
+++ b/jstests/noPassthrough/restart_index_build_if_resume_interrupted_by_shutdown.js
@@ -51,8 +51,6 @@ ResumableIndexBuildTest.runResumeInterruptedByShutdown(
[{a: 77}, {a: 88}],
[{a: 99}, {a: 100}]);
-// TODO (SERVER-65978): Add side writes to these test cases once they are supported by column store
-// index builds.
if (columnstoreEnabled) {
ResumableIndexBuildTest.runResumeInterruptedByShutdown(
rst,
@@ -63,8 +61,15 @@ if (columnstoreEnabled) {
{name: "hangIndexBuildDuringCollectionScanPhaseBeforeInsertion", logIdWithBuildUUID: 20386},
"collection scan",
{a: 1}, // initial doc
- [],
- [{a: 4}, {a: 5}]);
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 14}]}, {a: 15}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 14}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 2}));
+ assert.commandWorked(collection.update({a: 15}, {a: 2}));
+ }),
+ [{a: 16}, {a: 17}]);
ResumableIndexBuildTest.runResumeInterruptedByShutdown(
rst,
@@ -74,8 +79,15 @@ if (columnstoreEnabled) {
"resumable_index_build4", // index name
{name: "hangIndexBuildDuringBulkLoadPhase", logIdWithIndexName: 4924400},
"bulk load",
- {a: [11, 22, 33]}, // initial doc
- [],
+ {a: [44, 55, 66]}, // initial doc
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 77}]}, {a: 88}]));
+ assert.commandWorked(collection.update({a: [44, 55, 66]}, {a: [55, 66]}));
+ assert.commandWorked(collection.remove({"a.b": 77}));
+ assert.commandWorked(collection.insert({a: 99}));
+ assert.commandWorked(collection.remove({a: [55, 66]}));
+ assert.commandWorked(collection.update({a: 99}, {a: 1}));
+ }),
[{a: 99}, {a: 100}]);
}
rst.stopSet();
diff --git a/jstests/noPassthrough/resumable_index_build_drain_writes_phase.js b/jstests/noPassthrough/resumable_index_build_drain_writes_phase.js
index 088759a3a81..bf70ffffb1c 100644
--- a/jstests/noPassthrough/resumable_index_build_drain_writes_phase.js
+++ b/jstests/noPassthrough/resumable_index_build_drain_writes_phase.js
@@ -12,6 +12,7 @@
"use strict";
load("jstests/noPassthrough/libs/index_build.js");
+load("jstests/libs/sbe_util.js"); // For checkSBEEnabled.
const dbName = "test";
@@ -19,6 +20,9 @@ const rst = new ReplSetTest({nodes: 1});
rst.startSet();
rst.initiate();
+const columnstoreEnabled = checkSBEEnabled(
+ rst.getPrimary().getDB(dbName), ["featureFlagColumnstoreIndexes", "featureFlagSbeFull"], true);
+
const runTests = function(docs, indexSpecsFlat, sideWrites, collNameSuffix) {
const coll = rst.getPrimary().getDB(dbName).getCollection(jsTestName() + collNameSuffix);
assert.commandWorked(coll.insert(docs));
@@ -53,6 +57,18 @@ runTests({a: 1},
[{"$**": 1}, {h: 1}],
[{a: [1, 2], b: {c: [3, 4]}, d: ""}, {e: "", f: [[]], g: null, h: 8}],
"_wildcard");
-
+if (columnstoreEnabled) {
+ runTests({a: 1},
+ [{"$**": "columnstore"}, {h: 1}],
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{c: 2}], b: 2}, {a: 3, b: 3}]));
+ assert.commandWorked(collection.update({a: 3}, {a: 4, b: 3}));
+ assert.commandWorked(collection.remove({"a.c": 2}));
+ assert.commandWorked(collection.insert({a: 4, b: 4}));
+ assert.commandWorked(collection.remove({b: 3}));
+ assert.commandWorked(collection.update({a: 4}, {a: 2}));
+ }),
+ "_columnstore");
+}
rst.stopSet();
})(); \ No newline at end of file
diff --git a/jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js b/jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js
index 8bc4c431246..8e6f529c8c4 100644
--- a/jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js
+++ b/jstests/noPassthrough/resumable_index_build_drain_writes_phase_primary.js
@@ -13,6 +13,7 @@
"use strict";
load("jstests/noPassthrough/libs/index_build.js");
+load("jstests/libs/sbe_util.js"); // For checkSBEEnabled.
const dbName = "test";
const collName = jsTestName();
@@ -24,6 +25,9 @@ rst.initiate();
const primary = rst.getPrimary();
const coll = primary.getDB(dbName).getCollection(collName);
+const columnstoreEnabled = checkSBEEnabled(
+ primary.getDB(dbName), ["featureFlagColumnstoreIndexes", "featureFlagSbeFull"], true);
+
assert.commandWorked(coll.insert({a: 1}));
jsTestLog("Testing when primary shuts down in the middle of the first drain");
@@ -76,5 +80,83 @@ ResumableIndexBuildTest.runOnPrimaryToTestCommitQuorum(
[{a: 14}, {a: 15}],
[{a: 16}, {a: 17}]);
+if (columnstoreEnabled) {
+ ResumableIndexBuildTest.run(
+ rst,
+ dbName,
+ collName,
+ [[{"$**": "columnstore"}]],
+ [{name: "hangIndexBuildDuringDrainWritesPhase", logIdWithIndexName: 4841800}],
+ 0,
+ ["drain writes"],
+ [{skippedPhaseLogID: 20392}],
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 10}]}, {a: 11}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 10}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 11}, {a: 1}));
+ }),
+ [{a: 12}, {a: 13}]);
+ ResumableIndexBuildTest.run(
+ rst,
+ dbName,
+ collName,
+ [[{"$**": "columnstore"}]],
+ [{name: "hangIndexBuildDuringDrainWritesPhase", logIdWithIndexName: 4841800}],
+ 1,
+ ["drain writes"],
+ [{skippedPhaseLogID: 20392}],
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 14}]}, {a: 15}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 14}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 15}, {a: 1}));
+ }),
+ [{a: 16}, {a: 17}]);
+
+ jsTestLog("Testing when primary shuts down after voting, but before commit quorum satisfied");
+
+ ResumableIndexBuildTest.runOnPrimaryToTestCommitQuorum(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ "hangIndexBuildAfterSignalPrimaryForCommitReadiness",
+ "hangAfterIndexBuildFirstDrain",
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 22}]}, {a: 23}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 22}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 23}, {a: 1}));
+ }),
+ [{a: 24}, {a: 25}]);
+
+ jsTestLog(
+ "Testing when primary shuts down after commit quorum satisfied, but before commitIndexBuild oplog entry written");
+
+ ResumableIndexBuildTest.runOnPrimaryToTestCommitQuorum(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ "hangIndexBuildAfterSignalPrimaryForCommitReadiness",
+ "hangIndexBuildAfterSignalPrimaryForCommitReadiness",
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 30}]}, {a: 31}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 30}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 31}, {a: 1}));
+ }),
+ [{a: 32}, {a: 33}]);
+}
+
rst.stopSet();
})(); \ No newline at end of file
diff --git a/jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js b/jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js
index 1163d0664c4..04bc57ad2a8 100644
--- a/jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js
+++ b/jstests/noPassthrough/resumable_index_build_drain_writes_phase_secondary.js
@@ -16,6 +16,7 @@
"use strict";
load("jstests/noPassthrough/libs/index_build.js");
+load("jstests/libs/sbe_util.js"); // For checkSBEEnabled.
const dbName = "test";
const collName = jsTestName();
@@ -32,6 +33,9 @@ rst.initiate();
let primary = rst.getPrimary();
let coll = primary.getDB(dbName).getCollection(collName);
+const columnstoreEnabled = checkSBEEnabled(
+ primary.getDB(dbName), ["featureFlagColumnstoreIndexes", "featureFlagSbeFull"], true);
+
assert.commandWorked(coll.insert({a: 1}));
jsTestLog("Testing when secondary shuts down in the middle of the first drain");
@@ -80,5 +84,84 @@ ResumableIndexBuildTest.runOnSecondary(rst,
[{a: 14}, {a: 15}],
[{a: 16}, {a: 17}]);
+if (columnstoreEnabled) {
+ jsTestLog("Testing when secondary shuts down in the middle of the first drain");
+ ResumableIndexBuildTest.runOnSecondary(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ "hangIndexBuildDuringDrainWritesPhase",
+ 0,
+ undefined, /* primaryFailPointName */
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 10}]}, {a: 11}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 10}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 11}, {a: 1}));
+ }),
+ [{a: 12}, {a: 13}]);
+ ResumableIndexBuildTest.runOnSecondary(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ "hangIndexBuildDuringDrainWritesPhase",
+ 1,
+ undefined, /* primaryFailPointName */
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 14}]}, {a: 15}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 14}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 15}, {a: 1}));
+ }),
+ [{a: 16}, {a: 17}]);
+
+ jsTestLog("Testing when secondary shuts down before voting");
+
+ ResumableIndexBuildTest.runOnSecondary(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ "hangAfterIndexBuildFirstDrain",
+ {},
+ undefined, /* primaryFailPointName */
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 18}]}, {a: 19}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 18}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 19}, {a: 1}));
+ }),
+ [{a: 20}, {a: 21}]);
+
+ jsTestLog(
+ "Testing when secondary shuts down after commit quorum satisfied, but before replicating commitIndexBuild oplog entry");
+
+ ResumableIndexBuildTest.runOnSecondary(
+ rst,
+ dbName,
+ collName,
+ {"$**": "columnstore"},
+ "hangIndexBuildAfterSignalPrimaryForCommitReadiness",
+ {},
+ "hangIndexBuildBeforeCommit",
+ (function(collection) {
+ assert.commandWorked(collection.insert([{a: [{b: 26}]}, {a: 27}]));
+ assert.commandWorked(collection.update({a: 1}, {a: 2}));
+ assert.commandWorked(collection.remove({"a.b": 26}));
+ assert.commandWorked(collection.insert({a: 1}));
+ assert.commandWorked(collection.remove({a: 1}));
+ assert.commandWorked(collection.update({a: 27}, {a: 1}));
+ }),
+ [{a: 28}, {a: 29}]);
+}
+
rst.stopSet();
})(); \ No newline at end of file
diff --git a/jstests/noPassthrough/resumable_index_build_mixed_phases.js b/jstests/noPassthrough/resumable_index_build_mixed_phases.js
index b62da7449e6..0e165510e85 100644
--- a/jstests/noPassthrough/resumable_index_build_mixed_phases.js
+++ b/jstests/noPassthrough/resumable_index_build_mixed_phases.js
@@ -61,6 +61,15 @@ const runTests = function(failPoints, resumePhases, resumeChecks) {
resumePhases,
resumeChecks,
"_wildcard");
+
+ if (columnstoreEnabled) {
+ runTest([{a: 1, b: 1}, {a: 2, b: 2}, {a: 3, b: 3}],
+ [[{"$**": "columnstore"}], [{b: 1}]],
+ failPoints,
+ resumePhases,
+ resumeChecks,
+ "_columnstore");
+ }
};
runTests(
@@ -105,59 +114,5 @@ runTests(
],
["bulk load", "drain writes"],
[{skippedPhaseLogID: 20391}, {skippedPhaseLogID: 20392}]);
-
-// TODO (SERVER-65978): Add sidewrites to tests and combine columnTests with normal runTests once
-// side writes are implemented as the numbers for numScannedAfterResume will match
-if (columnstoreEnabled) {
- const runColumnTests = function(failPoints, resumePhases, resumeChecks) {
- const docs = [{a: 1, b: 1}, {a: 2, b: 2}, {a: 3, b: 3}];
- const coll = rst.getPrimary().getDB(dbName).getCollection(
- jsTestName() + "_" + resumePhases[0].replace(" ", "_") + "_" +
- resumePhases[1].replace(" ", "_") + "_columnstore");
- assert.commandWorked(coll.insert(docs));
-
- ResumableIndexBuildTest.run(rst,
- dbName,
- coll.getName(),
- [[{b: 1}], [{"$**": "columnstore"}]],
- failPoints,
- 1,
- resumePhases,
- resumeChecks,
- [],
- [{a: 7, b: 7}, {a: 8, b: 8}, {a: 9, b: 9}]);
- };
-
- runColumnTests(
- [
- {name: "hangIndexBuildBeforeWaitingUntilMajorityOpTime", logIdWithBuildUUID: 4940901},
- {
- name: "hangIndexBuildDuringCollectionScanPhaseBeforeInsertion",
- logIdWithBuildUUID: 20386
- }
- ],
- ["initialized", "collection scan"],
- [{numScannedAfterResume: 3}, {numScannedAfterResume: 2}]);
-
- runColumnTests(
- [
- {name: "hangIndexBuildBeforeWaitingUntilMajorityOpTime", logIdWithBuildUUID: 4940901},
- {name: "hangIndexBuildDuringBulkLoadPhase", logIdWithIndexName: 4924400}
-
- ],
- ["initialized", "bulk load"],
- [{numScannedAfterResume: 3}, {skippedPhaseLogID: 20391}]);
-
- runColumnTests(
- [
- {
- name: "hangIndexBuildDuringCollectionScanPhaseBeforeInsertion",
- logIdWithBuildUUID: 20386
- },
- {name: "hangIndexBuildDuringBulkLoadPhase", logIdWithIndexName: 4924400}
- ],
- ["collection scan", "bulk load"],
- [{numScannedAfterResume: 2}, {skippedPhaseLogID: 20391}]);
-}
rst.stopSet();
-})();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript
index 32a10b4eee7..d4fc11138c6 100644
--- a/src/mongo/db/index/SConscript
+++ b/src/mongo/db/index/SConscript
@@ -154,6 +154,7 @@ env.Library(
'$BUILD_DIR/mongo/db/fts/base_fts',
'$BUILD_DIR/mongo/db/resumable_index_builds_idl',
'$BUILD_DIR/mongo/db/server_base',
+ '$BUILD_DIR/mongo/db/storage/execution_context',
'column_store_index',
'expression_params',
'key_generator',
diff --git a/src/mongo/db/index/columns_access_method.cpp b/src/mongo/db/index/columns_access_method.cpp
index ef03980b917..55e6c9ad03a 100644
--- a/src/mongo/db/index/columns_access_method.cpp
+++ b/src/mongo/db/index/columns_access_method.cpp
@@ -41,7 +41,9 @@
#include "mongo/db/index/column_cell.h"
#include "mongo/db/index/column_key_generator.h"
#include "mongo/db/index/column_store_sorter.h"
+#include "mongo/db/index/index_build_interceptor.h"
#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/storage/execution_context.h"
#include "mongo/logv2/log.h"
#include "mongo/util/progress_meter.h"
@@ -54,6 +56,11 @@ inline void inc(int64_t* counter) {
if (counter)
++*counter;
};
+
+inline void dec(int64_t* counter) {
+ if (counter)
+ --*counter;
+};
} // namespace
ColumnStoreAccessMethod::ColumnStoreAccessMethod(IndexCatalogEntry* ice,
@@ -260,6 +267,26 @@ Status ColumnStoreAccessMethod::BulkBuilder::keyCommitted(
return Status::OK();
}
+
+void ColumnStoreAccessMethod::_visitCellsForIndexInsert(
+ OperationContext* opCtx,
+ PooledFragmentBuilder& buf,
+ const std::vector<BsonRecord>& bsonRecords,
+ function_ref<void(StringData, const BsonRecord&)> cb) const {
+ _keyGen.visitCellsForInsert(
+ bsonRecords,
+ [&](StringData path, const BsonRecord& rec, const column_keygen::UnencodedCellView& cell) {
+ if (!rec.ts.isNull()) {
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts));
+ }
+ buf.reset();
+ column_keygen::writeEncodedCell(cell, &buf);
+ tassert(
+ 6597800, "RecordID cannot be a string for column store indexes", !rec.id.isStr());
+ cb(path, rec);
+ });
+}
+
Status ColumnStoreAccessMethod::insert(OperationContext* opCtx,
SharedBufferFragmentBuilder& pooledBufferBuilder,
const CollectionPtr& coll,
@@ -268,24 +295,33 @@ Status ColumnStoreAccessMethod::insert(OperationContext* opCtx,
int64_t* keysInsertedOut) {
try {
PooledFragmentBuilder buf(pooledBufferBuilder);
- auto cursor = _store->newWriteCursor(opCtx);
- _keyGen.visitCellsForInsert(
- bsonRecords,
- [&](StringData path,
- const BsonRecord& rec,
- const column_keygen::UnencodedCellView& cell) {
- if (!rec.ts.isNull()) {
- uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(rec.ts));
+ // We cannot write to the index during its initial build phase, so we defer this insert as a
+ // "side write" to be applied after the build completes.
+ if (_indexCatalogEntry->isHybridBuilding()) {
+ auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys();
+ _visitCellsForIndexInsert(
+ opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) {
+ columnKeys->emplace_back(
+ path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rec.id);
+ });
+ int64_t inserted = 0;
+ ON_BLOCK_EXIT([keysInsertedOut, inserted] {
+ if (keysInsertedOut) {
+ *keysInsertedOut += inserted;
}
-
- buf.reset();
- column_keygen::writeEncodedCell(cell, &buf);
- invariant(!rec.id.isStr());
- cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())});
-
- inc(keysInsertedOut);
});
- return Status::OK();
+ uassertStatusOK(_indexCatalogEntry->indexBuildInterceptor()->sideWrite(
+ opCtx, *columnKeys, IndexBuildInterceptor::Op::kInsert, &inserted));
+ return Status::OK();
+ } else {
+ auto cursor = _store->newWriteCursor(opCtx);
+ _visitCellsForIndexInsert(
+ opCtx, buf, bsonRecords, [&](StringData path, const BsonRecord& rec) {
+ cursor->insert(path, rec.id.getLong(), CellView{buf.buf(), size_t(buf.len())});
+ inc(keysInsertedOut);
+ });
+ return Status::OK();
+ }
} catch (const AssertionException& ex) {
return ex.toStatus();
}
@@ -300,12 +336,26 @@ void ColumnStoreAccessMethod::remove(OperationContext* opCtx,
const InsertDeleteOptions& options,
int64_t* keysDeletedOut,
CheckRecordId checkRecordId) {
- auto cursor = _store->newWriteCursor(opCtx);
- _keyGen.visitPathsForDelete(obj, [&](StringData path) {
- tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr());
- cursor->remove(path, rid.getLong());
- inc(keysDeletedOut);
- });
+ if (_indexCatalogEntry->isHybridBuilding()) {
+ auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys();
+ _keyGen.visitPathsForDelete(obj, [&](StringData path) {
+ columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid));
+ });
+ int64_t removed = 0;
+ fassert(6597801,
+ _indexCatalogEntry->indexBuildInterceptor()->sideWrite(
+ opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed));
+ if (keysDeletedOut) {
+ *keysDeletedOut += removed;
+ }
+ } else {
+ auto cursor = _store->newWriteCursor(opCtx);
+ _keyGen.visitPathsForDelete(obj, [&](PathView path) {
+ tassert(6762301, "RecordID cannot be a string for column store indexes", !rid.isStr());
+ cursor->remove(path, rid.getLong());
+ inc(keysDeletedOut);
+ });
+ }
}
Status ColumnStoreAccessMethod::update(OperationContext* opCtx,
@@ -318,37 +368,88 @@ Status ColumnStoreAccessMethod::update(OperationContext* opCtx,
int64_t* keysInsertedOut,
int64_t* keysDeletedOut) {
PooledFragmentBuilder buf(pooledBufferBuilder);
- auto cursor = _store->newWriteCursor(opCtx);
- _keyGen.visitDiffForUpdate(
- oldDoc,
- newDoc,
- [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction,
- StringData path,
- const column_keygen::UnencodedCellView* cell) {
- if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) {
- tassert(
- 6762302, "RecordID cannot be a string for column store indexes", !rid.isStr());
- cursor->remove(path, rid.getLong());
- inc(keysDeletedOut);
- return;
- }
- // kInsert and kUpdate are handled almost identically. If we switch to using
- // `overwrite=true` cursors in WT, we could consider making them the same, although that
- // might disadvantage other implementations of the storage engine API.
- buf.reset();
- column_keygen::writeEncodedCell(*cell, &buf);
+ if (_indexCatalogEntry->isHybridBuilding()) {
+ auto columnKeys = StorageExecutionContext::get(opCtx).columnKeys();
+ _keyGen.visitDiffForUpdate(
+ oldDoc,
+ newDoc,
+ [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction,
+ StringData path,
+ const column_keygen::UnencodedCellView* cell) {
+ if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) {
+ columnKeys->emplace_back(std::make_tuple(path.toString(), "", rid));
+ int64_t removed = 0;
+ fassert(6597802,
+ _indexCatalogEntry->indexBuildInterceptor()->sideWrite(
+ opCtx, *columnKeys, IndexBuildInterceptor::Op::kDelete, &removed));
+
+ if (keysDeletedOut) {
+ *keysDeletedOut += removed;
+ }
+ return;
+ }
+
+ // kInsert and kUpdate are handled almost identically. If we switch to using
+ // `overwrite=true` cursors in WT, we could consider making them the same,
+ // although that might disadvantage other implementations of the storage engine
+ // API.
+ buf.reset();
+ column_keygen::writeEncodedCell(*cell, &buf);
- const auto method = diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert
- ? &ColumnStore::WriteCursor::insert
- : &ColumnStore::WriteCursor::update;
- tassert(6762303, "RecordID cannot be a string for column store indexes", !rid.isStr());
- (cursor.get()->*method)(path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())});
+ const auto method =
+ diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert
+ ? IndexBuildInterceptor::Op::kInsert
+ : IndexBuildInterceptor::Op::kUpdate;
- inc(keysInsertedOut);
- });
+ columnKeys->emplace_back(std::make_tuple(
+ path.toString(), CellView{buf.buf(), size_t(buf.len())}.toString(), rid));
+
+ int64_t inserted = 0;
+ Status status = _indexCatalogEntry->indexBuildInterceptor()->sideWrite(
+ opCtx, *columnKeys, method, &inserted);
+ if (keysInsertedOut) {
+ *keysInsertedOut += inserted;
+ }
+ });
+
+ } else {
+ auto cursor = _store->newWriteCursor(opCtx);
+ _keyGen.visitDiffForUpdate(
+ oldDoc,
+ newDoc,
+ [&](column_keygen::ColumnKeyGenerator::DiffAction diffAction,
+ StringData path,
+ const column_keygen::UnencodedCellView* cell) {
+ if (diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kDelete) {
+ tassert(6762302,
+ "RecordID cannot be a string for column store indexes",
+ !rid.isStr());
+ cursor->remove(path, rid.getLong());
+ inc(keysDeletedOut);
+ return;
+ }
+
+ // kInsert and kUpdate are handled almost identically. If we switch to using
+ // `overwrite=true` cursors in WT, we could consider making them the same, although
+ // that might disadvantage other implementations of the storage engine API.
+ buf.reset();
+ column_keygen::writeEncodedCell(*cell, &buf);
+
+ const auto method =
+ diffAction == column_keygen::ColumnKeyGenerator::DiffAction::kInsert
+ ? &ColumnStore::WriteCursor::insert
+ : &ColumnStore::WriteCursor::update;
+ tassert(
+ 6762303, "RecordID cannot be a string for column store indexes", !rid.isStr());
+ (cursor.get()->*method)(
+ path, rid.getLong(), CellView{buf.buf(), size_t(buf.len())});
+
+ inc(keysInsertedOut);
+ });
+ }
return Status::OK();
-}
+} // namespace mongo
Status ColumnStoreAccessMethod::initializeAsEmpty(OperationContext* opCtx) {
return Status::OK();
@@ -383,8 +484,9 @@ std::unique_ptr<IndexAccessMethod::BulkBuilder> ColumnStoreAccessMethod::initiat
size_t maxMemoryUsageBytes,
const boost::optional<IndexStateInfo>& stateInfo,
StringData dbName) {
- return stateInfo ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName)
- : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName);
+ return (stateInfo && stateInfo->getFileName())
+ ? std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, *stateInfo, dbName)
+ : std::make_unique<BulkBuilder>(this, maxMemoryUsageBytes, dbName);
}
std::shared_ptr<Ident> ColumnStoreAccessMethod::getSharedIdent() const {
@@ -395,4 +497,41 @@ void ColumnStoreAccessMethod::setIdent(std::shared_ptr<Ident> ident) {
_store->setIdent(std::move(ident));
}
+void ColumnStoreAccessMethod::applyColumnDataSideWrite(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const BSONObj& operation,
+ int64_t* keysInserted,
+ int64_t* keysDeleted) {
+ const IndexBuildInterceptor::Op opType = operation.getStringField("op") == "i"_sd
+ ? IndexBuildInterceptor::Op::kInsert
+ : operation.getStringField("op") == "d"_sd ? IndexBuildInterceptor::Op::kDelete
+ : IndexBuildInterceptor::Op::kUpdate;
+
+ RecordId rid = RecordId::deserializeToken(operation.getField("rid"));
+
+ CellView cell = operation.getStringField("cell");
+ PathView path = operation.getStringField("path");
+
+ auto cursor = _store->newWriteCursor(opCtx);
+
+ tassert(6597803, "RecordID cannot be a string for column store indexes", !rid.isStr());
+ switch (opType) {
+ case IndexBuildInterceptor::Op::kInsert:
+ cursor->insert(path, rid.getLong(), cell);
+ inc(keysInserted);
+ opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); });
+ break;
+ case IndexBuildInterceptor::Op::kDelete:
+ cursor->remove(path, rid.getLong());
+ inc(keysDeleted);
+ opCtx->recoveryUnit()->onRollback([keysDeleted] { dec(keysDeleted); });
+ break;
+ case IndexBuildInterceptor::Op::kUpdate:
+ cursor->update(path, rid.getLong(), cell);
+ inc(keysInserted);
+ opCtx->recoveryUnit()->onRollback([keysInserted] { dec(keysInserted); });
+ break;
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/index/columns_access_method.h b/src/mongo/db/index/columns_access_method.h
index fefa5240468..44848fa114c 100644
--- a/src/mongo/db/index/columns_access_method.h
+++ b/src/mongo/db/index/columns_access_method.h
@@ -75,6 +75,7 @@ public:
const InsertDeleteOptions& options,
int64_t* keysDeletedOut,
CheckRecordId checkRecordId) final;
+
Status update(OperationContext* opCtx,
SharedBufferFragmentBuilder& pooledBufferBuilder,
const BSONObj& oldDoc,
@@ -85,6 +86,12 @@ public:
int64_t* keysInsertedOut,
int64_t* keysDeletedOut) final;
+ void applyColumnDataSideWrite(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const BSONObj& operation,
+ int64_t* keysInserted,
+ int64_t* keysDeleted) final;
+
Status initializeAsEmpty(OperationContext* opCtx) final;
void validate(OperationContext* opCtx,
@@ -117,6 +124,11 @@ public:
class BulkBuilder;
private:
+ void _visitCellsForIndexInsert(OperationContext* opCtx,
+ PooledFragmentBuilder& pooledFragmentBuilder,
+ const std::vector<BsonRecord>& bsonRecords,
+ function_ref<void(StringData, const BsonRecord&)> cb) const;
+
const std::unique_ptr<ColumnStore> _store;
IndexCatalogEntry* const _indexCatalogEntry; // owned by IndexCatalog
const IndexDescriptor* const _descriptor;
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index db5a1974aa1..7e7bc6a7c89 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -620,6 +620,66 @@ void SortedDataIndexAccessMethod::setIdent(std::shared_ptr<Ident> newIdent) {
this->_newInterface->setIdent(std::move(newIdent));
}
+Status SortedDataIndexAccessMethod::applySortedDataSideWrite(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const BSONObj& operation,
+ const InsertDeleteOptions& options,
+ KeyHandlerFn&& onDuplicateKey,
+ int64_t* const keysInserted,
+ int64_t* const keysDeleted) {
+ auto opType = [&operation] {
+ switch (operation.getStringField("op")[0]) {
+ case 'i':
+ return IndexBuildInterceptor::Op::kInsert;
+ case 'd':
+ return IndexBuildInterceptor::Op::kDelete;
+ case 'u':
+ return IndexBuildInterceptor::Op::kUpdate;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }();
+
+ // Deserialize the encoded KeyString::Value.
+ int keyLen;
+ const char* binKey = operation["key"].binData(keyLen);
+ BufReader reader(binKey, keyLen);
+ const KeyString::Value keyString =
+ KeyString::Value::deserialize(reader, getSortedDataInterface()->getKeyStringVersion());
+
+ const KeyStringSet keySet{keyString};
+ if (opType == IndexBuildInterceptor::Op::kInsert) {
+ int64_t numInserted;
+ auto status = insertKeysAndUpdateMultikeyPaths(opCtx,
+ coll,
+ {keySet.begin(), keySet.end()},
+ {},
+ MultikeyPaths{},
+ options,
+ std::move(onDuplicateKey),
+ &numInserted);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ *keysInserted += numInserted;
+ opCtx->recoveryUnit()->onRollback(
+ [keysInserted, numInserted] { *keysInserted -= numInserted; });
+ } else {
+ invariant(opType == IndexBuildInterceptor::Op::kDelete);
+ int64_t numDeleted;
+ Status s = removeKeys(opCtx, {keySet.begin(), keySet.end()}, options, &numDeleted);
+ if (!s.isOK()) {
+ return s;
+ }
+
+ *keysDeleted += numDeleted;
+ opCtx->recoveryUnit()->onRollback(
+ [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; });
+ }
+ return Status::OK();
+}
+
void IndexAccessMethod::BulkBuilder::countNewBuildInStats() {
indexBulkBuilderSSS.count.addAndFetch(1);
}
@@ -1206,7 +1266,6 @@ void SortedDataIndexAccessMethod::_unindexKeysOrWriteToSideTable(
if (!status.isOK()) {
LOGV2(20362,
- "Couldn't unindex record {obj} from collection {namespace}: {error}",
"Couldn't unindex record",
"record"_attr = redact(obj),
"namespace"_attr = ns,
diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h
index 8c565152b67..40e012aa238 100644
--- a/src/mongo/db/index/index_access_method.h
+++ b/src/mongo/db/index/index_access_method.h
@@ -174,6 +174,24 @@ public:
*/
virtual void setIdent(std::shared_ptr<Ident> newIdent) = 0;
+ virtual Status applySortedDataSideWrite(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const BSONObj& operation,
+ const InsertDeleteOptions& options,
+ KeyHandlerFn&& onDuplicateKey,
+ int64_t* const keysInserted,
+ int64_t* const keysDeleted) {
+ MONGO_UNREACHABLE;
+ };
+
+ virtual void applyColumnDataSideWrite(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const BSONObj& operation,
+ int64_t* keysInserted,
+ int64_t* keysDeleted) {
+ MONGO_UNREACHABLE;
+ };
+
//
// Bulk operations support
//
@@ -561,6 +579,14 @@ public:
void setIdent(std::shared_ptr<Ident> newIdent) final;
+ Status applySortedDataSideWrite(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const BSONObj& operation,
+ const InsertDeleteOptions& options,
+ KeyHandlerFn&& onDuplicateKey,
+ int64_t* keysInserted,
+ int64_t* keysDeleted) final;
+
std::unique_ptr<BulkBuilder> initiateBulk(size_t maxMemoryUsageBytes,
const boost::optional<IndexStateInfo>& stateInfo,
StringData dbName) final;
diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp
index 36072c25bf5..872c5301d85 100644
--- a/src/mongo/db/index/index_build_interceptor.cpp
+++ b/src/mongo/db/index/index_build_interceptor.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/concurrency/exception_util.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/index/columns_access_method.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/index_build_interceptor_gen.h"
#include "mongo/db/multi_key_path_tracker.h"
@@ -291,56 +292,25 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx,
TrackDuplicates trackDups,
int64_t* const keysInserted,
int64_t* const keysDeleted) {
- // Deserialize the encoded KeyString::Value.
- int keyLen;
- const char* binKey = operation["key"].binData(keyLen);
- BufReader reader(binKey, keyLen);
- auto accessMethod = _indexCatalogEntry->accessMethod()->asSortedData();
- const KeyString::Value keyString = KeyString::Value::deserialize(
- reader, accessMethod->getSortedDataInterface()->getKeyStringVersion());
-
- const Op opType = operation.getStringField("op") == "i"_sd ? Op::kInsert : Op::kDelete;
-
- const KeyStringSet keySet{keyString};
- if (opType == Op::kInsert) {
- int64_t numInserted;
- auto status = accessMethod->insertKeysAndUpdateMultikeyPaths(
+ // Check field for "key" to determine if collection is sorted data or column store.
+ if (operation.hasField("key")) {
+ return _indexCatalogEntry->accessMethod()->applySortedDataSideWrite(
opCtx,
coll,
- {keySet.begin(), keySet.end()},
- {},
- MultikeyPaths{},
+ operation,
options,
[=](const KeyString::Value& duplicateKey) {
return trackDups == TrackDuplicates::kTrack
? recordDuplicateKey(opCtx, duplicateKey)
: Status::OK();
},
- &numInserted);
- if (!status.isOK()) {
- return status;
- }
-
- *keysInserted += numInserted;
- opCtx->recoveryUnit()->onRollback(
- [keysInserted, numInserted] { *keysInserted -= numInserted; });
+ keysInserted,
+ keysDeleted);
} else {
- invariant(opType == Op::kDelete);
- if (kDebugBuild)
- invariant(operation.getStringField("op") == "d"_sd);
-
- int64_t numDeleted;
- Status s =
- accessMethod->removeKeys(opCtx, {keySet.begin(), keySet.end()}, options, &numDeleted);
- if (!s.isOK()) {
- return s;
- }
-
- *keysDeleted += numDeleted;
- opCtx->recoveryUnit()->onRollback(
- [keysDeleted, numDeleted] { *keysDeleted -= numDeleted; });
+ _indexCatalogEntry->accessMethod()->applyColumnDataSideWrite(
+ opCtx, coll, operation, keysInserted, keysDeleted);
+ return Status::OK();
}
- return Status::OK();
}
void IndexBuildInterceptor::_yield(OperationContext* opCtx, const Yieldable* yieldable) {
@@ -422,6 +392,35 @@ boost::optional<MultikeyPaths> IndexBuildInterceptor::getMultikeyPaths() const {
return _multikeyPaths;
}
+Status IndexBuildInterceptor::_finishSideWrite(OperationContext* opCtx,
+ const std::vector<BSONObj>& toInsert) {
+ _sideWritesCounter->fetchAndAdd(toInsert.size());
+ // This insert may roll back, but not necessarily from inserting into this table. If other write
+ // operations outside this table and in the same transaction are rolled back, this counter also
+ // needs to be rolled back.
+ opCtx->recoveryUnit()->onRollback([sharedCounter = _sideWritesCounter, size = toInsert.size()] {
+ sharedCounter->fetchAndSubtract(size);
+ });
+
+ std::vector<Record> records;
+ for (auto& doc : toInsert) {
+ records.emplace_back(Record{RecordId(), // The storage engine will assign its own RecordId
+ // when we pass one that is null.
+ RecordData(doc.objdata(), doc.objsize())});
+ }
+
+ LOGV2_DEBUG(20691,
+ 2,
+ "Recording side write keys on index",
+ "numRecords"_attr = records.size(),
+ "index"_attr = _indexCatalogEntry->descriptor()->indexName());
+
+ // By passing a vector of null timestamps, these inserts are not timestamped individually, but
+ // rather with the timestamp of the owning operation.
+ std::vector<Timestamp> timestamps(records.size());
+ return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps);
+}
+
Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
const KeyStringSet& keys,
const KeyStringSet& multikeyMetadataKeys,
@@ -429,6 +428,7 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
Op op,
int64_t* const numKeysOut) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
+ invariant(op != IndexBuildInterceptor::Op::kUpdate);
// Maintain parity with IndexAccessMethods handling of key counting. Only include
// `multikeyMetadataKeys` when inserting.
@@ -478,9 +478,9 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
}
if (op == Op::kInsert) {
- // Wildcard indexes write multikey path information, typically part of the catalog
- // document, to the index itself. Multikey information is never deleted, so we only need
- // to add this data on the insert path.
+ // Wildcard indexes write multikey path information, typically part of the catalog document,
+ // to the index itself. Multikey information is never deleted, so we only need to add this
+ // data on the insert path.
for (const auto& keyString : multikeyMetadataKeys) {
builder.reset();
keyString.serialize(builder);
@@ -491,33 +491,41 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
}
}
- _sideWritesCounter->fetchAndAdd(toInsert.size());
- // This insert may roll back, but not necessarily from inserting into this table. If other write
- // operations outside this table and in the same transaction are rolled back, this counter also
- // needs to be rolled back.
- opCtx->recoveryUnit()->onRollback([sharedCounter = _sideWritesCounter, size = toInsert.size()] {
- sharedCounter->fetchAndSubtract(size);
- });
+ return _finishSideWrite(opCtx, std::move(toInsert));
+}
- std::vector<Record> records;
- for (auto& doc : toInsert) {
- records.emplace_back(Record{RecordId(), // The storage engine will assign its own RecordId
- // when we pass one that is null.
- RecordData(doc.objdata(), doc.objsize())});
- }
+Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
+ const PathCellSet& keys,
+ Op op,
+ int64_t* const numKeysOut) {
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
- LOGV2_DEBUG(20691,
- 2,
- "recording {records_size} side write keys on index "
- "'{indexCatalogEntry_descriptor_indexName}'",
- "records_size"_attr = records.size(),
- "indexCatalogEntry_descriptor_indexName"_attr =
- _indexCatalogEntry->descriptor()->indexName());
+ *numKeysOut = keys.size();
- // By passing a vector of null timestamps, these inserts are not timestamped individually, but
- // rather with the timestamp of the owning operation.
- std::vector<Timestamp> timestamps(records.size());
- return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps);
+ std::vector<BSONObj> toInsert;
+ toInsert.reserve(keys.size());
+ for (const auto& [path, cell, rid] : keys) {
+
+ BSONObjBuilder builder;
+ rid.serializeToken("rid", &builder);
+ builder.append("op", [op] {
+ switch (op) {
+ case Op::kInsert:
+ return "i";
+ case Op::kDelete:
+ return "d";
+ case Op::kUpdate:
+ return "u";
+ }
+ MONGO_UNREACHABLE;
+ }());
+ builder.append("path", path);
+ builder.append("cell", cell);
+
+ toInsert.push_back(builder.obj());
+ }
+
+ return _finishSideWrite(opCtx, std::move(toInsert));
}
Status IndexBuildInterceptor::retrySkippedRecords(OperationContext* opCtx,
diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h
index 46c4f5e6e8b..b1888d46f76 100644
--- a/src/mongo/db/index/index_build_interceptor.h
+++ b/src/mongo/db/index/index_build_interceptor.h
@@ -31,11 +31,13 @@
#include <memory>
+#include "mongo/db/index/columns_access_method.h"
#include "mongo/db/index/duplicate_key_tracker.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/multikey_paths.h"
#include "mongo/db/index/skipped_record_tracker.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/storage/column_store.h"
#include "mongo/db/storage/temporary_record_store.h"
#include "mongo/db/yieldable.h"
#include "mongo/platform/atomic_word.h"
@@ -53,7 +55,7 @@ public:
*/
enum class DrainYieldPolicy { kNoYield, kYield };
- enum class Op { kInsert, kDelete };
+ enum class Op { kInsert, kDelete, kUpdate };
/**
* Indicates whether to record duplicate keys that have been inserted into the index. When set
@@ -101,6 +103,18 @@ public:
int64_t* numKeysOut);
/**
+ * Client writes that are concurrent with a column store index build will have their index
+ * updates written to a temporary table. After the index table scan is complete, these updates
+ * will be applied to the underlying index table.
+ *
+ * On success, `numKeysOut` if non-null will contain the number of keys added or removed.
+ */
+ Status sideWrite(OperationContext* opCtx,
+ const PathCellSet& columnstoreKeys,
+ Op op,
+ int64_t* numKeysOut);
+
+ /**
* Given a duplicate key, record the key for later verification by a call to
* checkDuplicateKeyConstraints();
*/
@@ -173,7 +187,6 @@ public:
private:
using SideWriteRecord = std::pair<RecordId, BSONObj>;
-
Status _applyWrite(OperationContext* opCtx,
const CollectionPtr& coll,
const BSONObj& doc,
@@ -193,6 +206,8 @@ private:
FailPoint* fp,
long long iteration) const;
+ Status _finishSideWrite(OperationContext* opCtx, const std::vector<BSONObj>& toInsert);
+
// The entry for the index that is being built.
const IndexCatalogEntry* _indexCatalogEntry;
@@ -224,5 +239,4 @@ private:
MONGO_MAKE_LATCH("IndexBuildInterceptor::_multikeyPathMutex");
boost::optional<MultikeyPaths> _multikeyPaths;
};
-
} // namespace mongo
diff --git a/src/mongo/db/index/index_build_interceptor_test.cpp b/src/mongo/db/index/index_build_interceptor_test.cpp
index 30f996d69d2..e0f9f86c98a 100644
--- a/src/mongo/db/index/index_build_interceptor_test.cpp
+++ b/src/mongo/db/index/index_build_interceptor_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/db/catalog/catalog_test_fixture.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/index/index_build_interceptor.h"
+#include "mongo/idl/server_parameter_test_util.h"
namespace mongo {
namespace {
@@ -118,5 +119,238 @@ TEST_F(IndexBuilderInterceptorTest, SingleInsertIsSavedToSideWritesTable) {
<< "key" << serializedKeyString),
sideWrites[0]);
}
+
+
+TEST_F(IndexBuilderInterceptorTest, SingleColumnInsertIsSavedToSideWritesTable) {
+ RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true);
+ auto interceptor = createIndexBuildInterceptor(
+ fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}"));
+
+ PathCellSet columnKeys;
+ columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1)));
+
+ WriteUnitOfWork wuow(operationContext());
+ int64_t numKeys = 0;
+ ASSERT_OK(interceptor->sideWrite(
+ operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys));
+ ASSERT_EQ(1, numKeys);
+ wuow.commit();
+
+ BSONObjBuilder builder;
+ RecordId(1).serializeToken("rid", &builder);
+ BSONObj obj = builder.obj();
+ BSONElement elem = obj["rid"];
+
+ auto sideWrites = getSideWritesTableContents(std::move(interceptor));
+ ASSERT_EQ(1, sideWrites.size());
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op"
+ << "i"
+ << "path"
+ << "path"
+ << "cell"
+ << "cell"),
+ sideWrites[0]);
+}
+
+TEST_F(IndexBuilderInterceptorTest, SingleColumnDeleteIsSavedToSideWritesTable) {
+ RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true);
+ auto interceptor = createIndexBuildInterceptor(
+ fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}"));
+
+ PathCellSet columnKeys;
+ columnKeys.emplace_back(std::make_tuple("path", "", RecordId(1)));
+
+ WriteUnitOfWork wuow(operationContext());
+ int64_t numKeys = 0;
+ ASSERT_OK(interceptor->sideWrite(
+ operationContext(), columnKeys, IndexBuildInterceptor::Op::kDelete, &numKeys));
+ ASSERT_EQ(1, numKeys);
+ wuow.commit();
+
+ BSONObjBuilder builder;
+ RecordId(1).serializeToken("rid", &builder);
+ BSONObj obj = builder.obj();
+ BSONElement elem = obj["rid"];
+
+ auto sideWrites = getSideWritesTableContents(std::move(interceptor));
+ ASSERT_EQ(1, sideWrites.size());
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op"
+ << "d"
+ << "path"
+ << "path"
+ << "cell"
+ << ""),
+ sideWrites[0]);
+}
+
+TEST_F(IndexBuilderInterceptorTest, SingleColumnUpdateIsSavedToSideWritesTable) {
+ RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true);
+ auto interceptor = createIndexBuildInterceptor(
+ fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}"));
+
+ // create path + cell + rid
+ PathCellSet columnKeys;
+ columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1)));
+
+ WriteUnitOfWork wuow(operationContext());
+ int64_t numKeys = 0;
+ ASSERT_OK(interceptor->sideWrite(
+ operationContext(), columnKeys, IndexBuildInterceptor::Op::kUpdate, &numKeys));
+ ASSERT_EQ(1, numKeys);
+ wuow.commit();
+
+ BSONObjBuilder builder;
+ RecordId(1).serializeToken("rid", &builder);
+ BSONObj obj = builder.obj();
+ BSONElement elem = obj["rid"];
+
+ auto sideWrites = getSideWritesTableContents(std::move(interceptor));
+ ASSERT_EQ(1, sideWrites.size());
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem << "op"
+ << "u"
+ << "path"
+ << "path"
+ << "cell"
+ << "cell"),
+ sideWrites[0]);
+}
+
+TEST_F(IndexBuilderInterceptorTest, MultipleColumnInsertsAreSavedToSideWritesTable) {
+ RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true);
+ auto interceptor = createIndexBuildInterceptor(
+ fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}"));
+
+ PathCellSet columnKeys;
+ columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1)));
+ columnKeys.emplace_back(std::make_tuple("path1", "cell1", RecordId(1)));
+ columnKeys.emplace_back(std::make_tuple("path2", "cell2", RecordId(2)));
+ columnKeys.emplace_back(std::make_tuple("path3", "cell3", RecordId(2)));
+
+ WriteUnitOfWork wuow(operationContext());
+ int64_t numKeys = 0;
+
+ ASSERT_OK(interceptor->sideWrite(
+ operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys));
+ ASSERT_EQ(4, numKeys);
+ wuow.commit();
+
+ BSONObjBuilder builder;
+ RecordId(1).serializeToken("rid", &builder);
+ BSONObj obj = builder.obj();
+ BSONElement elem1 = obj["rid"];
+
+ BSONObjBuilder builder2;
+ RecordId(2).serializeToken("rid", &builder2);
+ BSONObj obj2 = builder2.obj();
+ BSONElement elem2 = obj2["rid"];
+
+ auto sideWrites = getSideWritesTableContents(std::move(interceptor));
+ ASSERT_EQ(4, sideWrites.size());
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op"
+ << "i"
+ << "path"
+ << "path"
+ << "cell"
+ << "cell"),
+ sideWrites[0]);
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op"
+ << "i"
+ << "path"
+ << "path1"
+ << "cell"
+ << "cell1"),
+ sideWrites[1]);
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op"
+ << "i"
+ << "path"
+ << "path2"
+ << "cell"
+ << "cell2"),
+ sideWrites[2]);
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op"
+ << "i"
+ << "path"
+ << "path3"
+ << "cell"
+ << "cell3"),
+ sideWrites[3]);
+}
+
+TEST_F(IndexBuilderInterceptorTest, MultipleColumnSideWritesAreSavedToSideWritesTable) {
+ RAIIServerParameterControllerForTest controller("featureFlagColumnstoreIndexes", true);
+ auto interceptor = createIndexBuildInterceptor(
+ fromjson("{v: 2, name: 'columnstore', key: {'$**': 'columnstore'}}"));
+
+ WriteUnitOfWork wuow(operationContext());
+ int64_t numKeys = 0;
+
+ PathCellSet columnKeys;
+ columnKeys.emplace_back(std::make_tuple("path", "cell", RecordId(1)));
+ ASSERT_OK(interceptor->sideWrite(
+ operationContext(), columnKeys, IndexBuildInterceptor::Op::kInsert, &numKeys));
+ ASSERT_EQ(1, numKeys);
+
+ PathCellSet columnKeys2;
+ columnKeys2.emplace_back(std::make_tuple("path", "", RecordId(1)));
+ ASSERT_OK(interceptor->sideWrite(
+ operationContext(), columnKeys2, IndexBuildInterceptor::Op::kDelete, &numKeys));
+ ASSERT_EQ(1, numKeys);
+
+
+ PathCellSet columnKeys3;
+ columnKeys3.emplace_back(std::make_tuple("path1", "cell1", RecordId(2)));
+ ASSERT_OK(interceptor->sideWrite(
+ operationContext(), columnKeys3, IndexBuildInterceptor::Op::kUpdate, &numKeys));
+ ASSERT_EQ(1, numKeys);
+
+ PathCellSet columnKeys4;
+ columnKeys4.emplace_back(std::make_tuple("path2", "cell2", RecordId(2)));
+ ASSERT_OK(interceptor->sideWrite(
+ operationContext(), columnKeys4, IndexBuildInterceptor::Op::kInsert, &numKeys));
+ ASSERT_EQ(1, numKeys);
+ wuow.commit();
+
+ BSONObjBuilder builder;
+ RecordId(1).serializeToken("rid", &builder);
+ BSONObj obj = builder.obj();
+ BSONElement elem1 = obj["rid"];
+
+ BSONObjBuilder builder2;
+ RecordId(2).serializeToken("rid", &builder2);
+ BSONObj obj2 = builder2.obj();
+ BSONElement elem2 = obj2["rid"];
+
+ auto sideWrites = getSideWritesTableContents(std::move(interceptor));
+ ASSERT_EQ(4, sideWrites.size());
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op"
+ << "i"
+ << "path"
+ << "path"
+ << "cell"
+ << "cell"),
+ sideWrites[0]);
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem1 << "op"
+ << "d"
+ << "path"
+ << "path"
+ << "cell"
+ << ""),
+ sideWrites[1]);
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op"
+ << "u"
+ << "path"
+ << "path1"
+ << "cell"
+ << "cell1"),
+ sideWrites[2]);
+ ASSERT_BSONOBJ_EQ(BSON("rid" << elem2 << "op"
+ << "i"
+ << "path"
+ << "path2"
+ << "cell"
+ << "cell2"),
+ sideWrites[3]);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/storage/column_store.h b/src/mongo/db/storage/column_store.h
index 9db8ca7d1bb..30b21cbe340 100644
--- a/src/mongo/db/storage/column_store.h
+++ b/src/mongo/db/storage/column_store.h
@@ -786,4 +786,6 @@ struct SplitCellView {
}
}
};
+
+using PathCellSet = std::vector<std::tuple<std::string, std::string, RecordId>>;
} // namespace mongo
diff --git a/src/mongo/db/storage/execution_context.h b/src/mongo/db/storage/execution_context.h
index 28479f833b9..4ceed7ec4a3 100644
--- a/src/mongo/db/storage/execution_context.h
+++ b/src/mongo/db/storage/execution_context.h
@@ -31,6 +31,7 @@
#include "mongo/db/index/multikey_paths.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/column_store.h"
#include "mongo/db/storage/key_string.h"
#include "mongo/util/auto_clear_ptr.h"
@@ -61,11 +62,15 @@ public:
AutoClearPtr<MultikeyPaths> multikeyPaths() {
return makeAutoClearPtr(&_multikeyPaths);
}
+ AutoClearPtr<PathCellSet> columnKeys() {
+ return makeAutoClearPtr(&_columnKeys);
+ }
private:
KeyStringSet _keys;
KeyStringSet _multikeyMetadataKeys;
MultikeyPaths _multikeyPaths;
+ PathCellSet _columnKeys;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
index 1caf0b67158..9f0561911b8 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_column_store.cpp
@@ -175,7 +175,6 @@ void WiredTigerColumnStore::WriteCursor::insert(PathView path, RowId rid, CellVi
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
metricsCollector.incrementOneIdxEntryWritten(c()->uri, keyItem.size);
- // TODO: SERVER-65978, we may have to specially handle WT_DUPLICATE_KEY error here.
if (ret) {
uassertStatusOK(wtRCToStatus(ret, c()->session));
}
@@ -219,7 +218,6 @@ void WiredTigerColumnStore::WriteCursor::update(PathView path, RowId rid, CellVi
auto& metricsCollector = ResourceConsumption::MetricsCollector::get(_opCtx);
metricsCollector.incrementOneIdxEntryWritten(c()->uri, keyItem.size);
- // TODO: SERVER-65978, may want to handle WT_NOTFOUND specially.
if (ret != 0)
return uassertStatusOK(wtRCToStatus(ret, c()->session));
}