summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/hybrid_unique_index_with_updates.js157
-rw-r--r--jstests/noPassthrough/indexbg2.js38
-rw-r--r--src/mongo/db/catalog/index_build_block.cpp17
-rw-r--r--src/mongo/db/catalog/multi_index_block.h15
-rw-r--r--src/mongo/db/catalog/multi_index_block_impl.cpp56
-rw-r--r--src/mongo/db/catalog/multi_index_block_impl.h8
-rw-r--r--src/mongo/db/catalog/multi_index_block_test.cpp12
-rw-r--r--src/mongo/db/index/SConscript17
-rw-r--r--src/mongo/db/index/duplicate_key_tracker.cpp86
-rw-r--r--src/mongo/db/index/duplicate_key_tracker.h37
-rw-r--r--src/mongo/db/index/duplicate_key_tracker_test.cpp334
-rw-r--r--src/mongo/db/index/index_build_interceptor.cpp74
-rw-r--r--src/mongo/db/index/index_build_interceptor.h28
-rw-r--r--src/mongo/dbtests/indexupdatetests.cpp12
14 files changed, 358 insertions, 533 deletions
diff --git a/jstests/noPassthrough/hybrid_unique_index_with_updates.js b/jstests/noPassthrough/hybrid_unique_index_with_updates.js
new file mode 100644
index 00000000000..b3dc94ca4fd
--- /dev/null
+++ b/jstests/noPassthrough/hybrid_unique_index_with_updates.js
@@ -0,0 +1,157 @@
+/**
+ * Tests that write operations are accepted and result in correct indexing behavior for each phase
+ * of hybrid unique index builds. This test inserts a duplicate document at different phases of an
+ * index build to confirm that the resulting behavior is failure.
+ *
+ * @tags: [requires_document_locking]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/check_log.js");
+
+ let conn = MongoRunner.runMongod();
+ let testDB = conn.getDB('test');
+
+ // Run 'func' while failpoint is enabled.
+ let doDuringFailpoint = function(failPointName, logMessage, func, i) {
+ clearRawMongoProgramOutput();
+ assert.commandWorked(testDB.adminCommand(
+ {configureFailPoint: failPointName, mode: "alwaysOn", data: {"i": i}}));
+
+ assert.soon(() => rawMongoProgramOutput().indexOf(logMessage) >= 0);
+
+ func();
+
+ assert.commandWorked(testDB.adminCommand({configureFailPoint: failPointName, mode: "off"}));
+ };
+
+ const docsToInsert = 1000;
+ let setUp = function(coll) {
+ coll.drop();
+
+ let bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < docsToInsert; i++) {
+ bulk.insert({i: i});
+ }
+ assert.commandWorked(bulk.execute());
+ };
+
+ let buildIndexInBackground = function(expectDuplicateKeyError) {
+ if (expectDuplicateKeyError) {
+ return startParallelShell(function() {
+ assert.commandFailedWithCode(
+ db.hybrid.createIndex({i: 1}, {background: true, unique: true}),
+ ErrorCodes.DuplicateKey);
+ }, conn.port);
+ }
+ return startParallelShell(function() {
+ assert.commandWorked(db.hybrid.createIndex({i: 1}, {background: true, unique: true}));
+ }, conn.port);
+ };
+
+ /**
+ * Run a background index build on a unique index under different configurations. Introduce
+ * duplicate keys on the index that may cause it to fail or succeed, depending on the following
+ * optional parmeters:
+ * {
+ * // Which operation used to introduce a duplicate key.
+ * operation {string}: "insert", "update"
+ *
+ * // Whether or not resolve the duplicate key before completing the build.
+ * resolve {bool}
+ *
+ * // Which phase of the index build to introduce the duplicate key.
+ * phase {number}: 0-4
+ * }
+ */
+ let runTest = function(config) {
+ jsTestLog("running test with config: " + tojson(config));
+
+ setUp(testDB.hybrid);
+
+ // Expect the build to fail with a duplicate key error if we insert a duplicate key and
+ // don't resolve it.
+ let expectDuplicate = config.resolve === false;
+ let awaitBuild = buildIndexInBackground(expectDuplicate);
+
+ // Introduce a duplicate key, either from an insert or update. Optionally, follow-up with an
+ // operation that will resolve the duplicate by removing it or updating it.
+ const dup = {i: 0};
+ let doOperation = function() {
+ if ("insert" == config.operation) {
+ assert.commandWorked(testDB.hybrid.insert(dup));
+ if (config.resolve) {
+ assert.commandWorked(testDB.hybrid.deleteOne(dup));
+ }
+ } else if ("update" == config.operation) {
+ assert.commandWorked(testDB.hybrid.update(dup, {i: 1}));
+ if (config.resolve) {
+ assert.commandWorked(testDB.hybrid.update({i: 1}, dup));
+ }
+ }
+ };
+
+ const stopKey = 0;
+ switch (config.phase) {
+ // Don't hang the build.
+ case undefined:
+ break;
+ // Hang before scanning the first document.
+ case 0:
+ doDuringFailpoint("hangBeforeIndexBuildOf",
+ "Hanging before index build of i=" + stopKey,
+ doOperation,
+ stopKey);
+ break;
+ // Hang after scanning the first document.
+ case 1:
+ doDuringFailpoint("hangAfterIndexBuildOf",
+ "Hanging after index build of i=" + stopKey,
+ doOperation,
+ stopKey);
+ break;
+ // Hang before the first drain and after dumping the keys from the external sorter into
+ // the index.
+ case 2:
+ doDuringFailpoint("hangAfterIndexBuildDumpsInsertsFromBulk",
+ "Hanging after dumping inserts from bulk builder",
+ doOperation);
+ break;
+ // Hang before the second drain.
+ case 3:
+ doDuringFailpoint("hangAfterIndexBuildFirstDrain",
+ "Hanging after index build first drain",
+ doOperation);
+ break;
+ // Hang before the final drain and commit.
+ case 4:
+ doDuringFailpoint("hangAfterIndexBuildSecondDrain",
+ "Hanging after index build second drain",
+ doOperation);
+ break;
+ default:
+ assert(false, "Invalid phase: " + config.phase);
+ }
+
+ awaitBuild();
+
+ let expectedDocs = docsToInsert;
+ expectedDocs += (config.operation == "insert" && config.resolve === false) ? 1 : 0;
+
+ assert.eq(expectedDocs, testDB.hybrid.count());
+ assert.eq(expectedDocs, testDB.hybrid.find().itcount());
+ assert.commandWorked(testDB.hybrid.validate({full: true}));
+ };
+
+ runTest({});
+
+ for (let i = 0; i <= 4; i++) {
+ runTest({operation: "insert", resolve: true, phase: i});
+ runTest({operation: "insert", resolve: false, phase: i});
+ runTest({operation: "update", resolve: true, phase: i});
+ runTest({operation: "update", resolve: false, phase: i});
+ }
+
+ MongoRunner.stopMongod(conn);
+})();
diff --git a/jstests/noPassthrough/indexbg2.js b/jstests/noPassthrough/indexbg2.js
index 7b16e143807..913fac1d1a2 100644
--- a/jstests/noPassthrough/indexbg2.js
+++ b/jstests/noPassthrough/indexbg2.js
@@ -96,11 +96,9 @@
};
// Unique background index build succeeds:
- // 1) when a document is inserted with a key that has already been indexed
- // (with the insert failing on duplicate key error).
- // 2) when a document with a key not present in the initial set is inserted twice
- // (with the initial insert succeeding and the second failing on duplicate key error).
- let succeedWithWriteErrors = function(coll, newKey) {
+ // 1) when a document is inserted and removed with a key that has already been indexed
+ // 2) when a document with a key not present in the initial set is inserted and removed
+ let succeedWithoutWriteErrors = function(coll, newKey) {
let duplicateKey = 3;
turnFailPointOn("hangAfterIndexBuildOf", duplicateKey);
@@ -112,32 +110,16 @@
jsTestLog("Waiting to hang after index build of i=" + duplicateKey);
checkLog.contains(conn, "Hanging after index build of i=" + duplicateKey);
- assert.writeError(coll.save({i: duplicateKey, n: true}));
+ assert.commandWorked(coll.insert({i: duplicateKey, n: true}));
- // First insert on key not present in initial set
- assert.writeOK(coll.save({i: newKey, n: true}));
- } catch (e) {
- turnFailPointOff("hangAfterIndexBuildOf");
- throw e;
- }
+ // First insert on key not present in initial set.
+ assert.commandWorked(coll.insert({i: newKey, n: true}));
- try {
- // We are currently hanging after indexing document with {i: duplicateKey}.
- // To perform next check, we need to hang after indexing document with {i: newKey}.
- // Add a hang before indexing document {i: newKey}, then turn off current hang
- // so we are always in a known state and don't skip over the indexing of {i: newKey}.
- turnFailPointOn("hangBeforeIndexBuildOf", newKey);
- turnFailPointOff("hangAfterIndexBuildOf");
- turnFailPointOn("hangAfterIndexBuildOf", newKey);
- turnFailPointOff("hangBeforeIndexBuildOf");
-
- // Second insert on key not present in intial set fails with duplicate key error
- jsTestLog("Waiting to hang after index build of i=" + newKey);
- checkLog.contains(conn, "Hanging after index build of i=" + newKey);
+ // Remove duplicates before completing the index build.
+ assert.commandWorked(coll.deleteOne({i: duplicateKey, n: true}));
+ assert.commandWorked(coll.deleteOne({i: newKey, n: true}));
- assert.writeError(coll.save({i: newKey, n: true}));
} finally {
- turnFailPointOff("hangBeforeIndexBuildOf");
turnFailPointOff("hangAfterIndexBuildOf");
}
@@ -164,7 +146,7 @@
failOnInsertedDuplicateValue(coll);
assert.eq(size, coll.count());
- succeedWithWriteErrors(coll, size);
+ succeedWithoutWriteErrors(coll, size);
waitParallel();
};
diff --git a/src/mongo/db/catalog/index_build_block.cpp b/src/mongo/db/catalog/index_build_block.cpp
index 5f8ef4e21d3..16d76b1de28 100644
--- a/src/mongo/db/catalog/index_build_block.cpp
+++ b/src/mongo/db/catalog/index_build_block.cpp
@@ -91,10 +91,10 @@ Status IndexCatalogImpl::IndexBuildBlock::init() {
_opCtx, std::move(descriptor), initFromDisk, isReadyIndex);
// Hybrid indexes are only enabled for background, non-unique indexes.
- // TODO: Remove when SERVER-38036 and SERVER-37270 are complete.
- const bool useHybrid = isBackgroundIndex && !descriptorPtr->unique();
+ // TODO: Remove when SERVER-37270 is complete.
+ const bool useHybrid = isBackgroundIndex;
if (useHybrid) {
- _indexBuildInterceptor = stdx::make_unique<IndexBuildInterceptor>(_opCtx);
+ _indexBuildInterceptor = stdx::make_unique<IndexBuildInterceptor>(_opCtx, _entry);
_entry->setIndexBuildInterceptor(_indexBuildInterceptor.get());
_opCtx->recoveryUnit()->onCommit(
@@ -146,8 +146,15 @@ void IndexCatalogImpl::IndexBuildBlock::success() {
NamespaceString ns(_indexNamespace);
invariant(_opCtx->lockState()->isDbLockedForMode(ns.db(), MODE_X));
- // An index build should never be completed with writes remaining in the interceptor.
- invariant(!_indexBuildInterceptor || _indexBuildInterceptor->areAllWritesApplied(_opCtx));
+ if (_indexBuildInterceptor) {
+ // An index build should never be completed with writes remaining in the interceptor.
+ invariant(_indexBuildInterceptor->areAllWritesApplied(_opCtx));
+
+ // Hybrid indexes must check for any outstanding duplicate key constraint violations when
+ // they finish.
+ uassertStatusOK(_indexBuildInterceptor->checkDuplicateKeyConstraints(_opCtx));
+ }
+
LOG(2) << "marking index " << _indexName << " as ready in snapshot id "
<< _opCtx->recoveryUnit()->getSnapshotId();
diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h
index 70283a83fa5..b9c4e7a5e65 100644
--- a/src/mongo/db/catalog/multi_index_block.h
+++ b/src/mongo/db/catalog/multi_index_block.h
@@ -86,7 +86,8 @@ public:
* If this is called before init(), we will ignore unique violations. This has no effect if
* no specs are unique.
*
- * If this is called, any dupsOut sets passed in will never be filled.
+ * If this is called, any 'dupRecords' set passed to dumpInsertsFromBulk() will never be
+ * filled.
*/
virtual void ignoreUniqueConstraint() = 0;
@@ -123,16 +124,13 @@ public:
virtual Status insertAllDocumentsInCollection() = 0;
/**
- * Call this after init() for each document in the collection. Any duplicate keys inserted will
- * be appended to 'dupKeysInserted' if it is not null.
+ * Call this after init() for each document in the collection.
*
* Do not call if you called insertAllDocumentsInCollection();
*
* Should be called inside of a WriteUnitOfWork.
*/
- virtual Status insert(const BSONObj& wholeDocument,
- const RecordId& loc,
- std::vector<BSONObj>* const dupKeysInserted = nullptr) = 0;
+ virtual Status insert(const BSONObj& wholeDocument, const RecordId& loc) = 0;
/**
* Call this after the last insert(). This gives the index builder a chance to do any
@@ -145,15 +143,10 @@ public:
* indexed, so callers MUST either fail this index build or delete the documents from the
* collection.
*
- * If 'dupKeysInserted' is passed as non-NULL and duplicates are allowed for the unique index,
- * violators of uniqueness constraints will still be indexed, and the keys will be appended to
- * the vector. No DuplicateKey errors will be returned.
- *
* Should not be called inside of a WriteUnitOfWork.
*/
virtual Status dumpInsertsFromBulk() = 0;
virtual Status dumpInsertsFromBulk(std::set<RecordId>* const dupRecords) = 0;
- virtual Status dumpInsertsFromBulk(std::vector<BSONObj>* const dupKeysInserted) = 0;
/**
* For background indexes using an IndexBuildInterceptor to capture inserts during a build,
diff --git a/src/mongo/db/catalog/multi_index_block_impl.cpp b/src/mongo/db/catalog/multi_index_block_impl.cpp
index 586ab43cb89..dd32d49bd43 100644
--- a/src/mongo/db/catalog/multi_index_block_impl.cpp
+++ b/src/mongo/db/catalog/multi_index_block_impl.cpp
@@ -302,11 +302,14 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlockImpl::init(const std::vector<BSO
_collection->getIndexCatalog()->prepareInsertDeleteOptions(
_opCtx, descriptor, &index.options);
- index.options.dupsAllowed = index.options.dupsAllowed || _ignoreUnique;
- index.options.fromIndexBuilder = true;
+ // Allow duplicates when explicitly allowed or an interceptor is installed, which will
+ // perform duplicate checking itself.
+ index.options.dupsAllowed |=
+ _ignoreUnique || index.block->getEntry()->indexBuildInterceptor();
if (_ignoreUnique) {
index.options.getKeysMode = IndexAccessMethod::GetKeysMode::kRelaxConstraints;
}
+ index.options.fromIndexBuilder = true;
log() << "build index on: " << ns << " properties: " << descriptor->toString();
if (index.bulk)
@@ -514,9 +517,7 @@ Status MultiIndexBlockImpl::insertAllDocumentsInCollection() {
return Status::OK();
}
-Status MultiIndexBlockImpl::insert(const BSONObj& doc,
- const RecordId& loc,
- std::vector<BSONObj>* const dupKeysInserted) {
+Status MultiIndexBlockImpl::insert(const BSONObj& doc, const RecordId& loc) {
if (State::kAborted == _getState()) {
return {ErrorCodes::IndexBuildAborted,
str::stream() << "Index build aborted: " << _abortReason
@@ -543,29 +544,15 @@ Status MultiIndexBlockImpl::insert(const BSONObj& doc,
if (!idxStatus.isOK())
return idxStatus;
-
- if (dupKeysInserted) {
- dupKeysInserted->insert(
- dupKeysInserted->end(), result.dupsInserted.begin(), result.dupsInserted.end());
- }
}
return Status::OK();
}
Status MultiIndexBlockImpl::dumpInsertsFromBulk() {
- return _dumpInsertsFromBulk(nullptr, nullptr);
+ return dumpInsertsFromBulk(nullptr);
}
Status MultiIndexBlockImpl::dumpInsertsFromBulk(std::set<RecordId>* dupRecords) {
- return _dumpInsertsFromBulk(dupRecords, nullptr);
-}
-
-Status MultiIndexBlockImpl::dumpInsertsFromBulk(std::vector<BSONObj>* dupKeysInserted) {
- return _dumpInsertsFromBulk(nullptr, dupKeysInserted);
-}
-
-Status MultiIndexBlockImpl::_dumpInsertsFromBulk(std::set<RecordId>* dupRecords,
- std::vector<BSONObj>* dupKeysInserted) {
if (State::kAborted == _getState()) {
return {ErrorCodes::IndexBuildAborted,
str::stream() << "Index build aborted: " << _abortReason
@@ -580,17 +567,37 @@ Status MultiIndexBlockImpl::_dumpInsertsFromBulk(std::set<RecordId>* dupRecords,
for (size_t i = 0; i < _indexes.size(); i++) {
if (_indexes[i].bulk == NULL)
continue;
+
+ // If 'dupRecords' is provided, it will be used to store all records that would result in
+ // duplicate key errors. Only pass 'dupKeysInserted', which stores inserted duplicate keys,
+ // when 'dupRecords' is not used because these two vectors are mutually incompatible.
+ std::vector<BSONObj> dupKeysInserted;
+
+ IndexCatalogEntry* entry = _indexes[i].block->getEntry();
LOG(1) << "\t dumping from external sorter into index: "
- << _indexes[i].block->getEntry()->descriptor()->indexName();
+ << entry->descriptor()->indexName();
Status status = _indexes[i].real->commitBulk(_opCtx,
_indexes[i].bulk.get(),
_allowInterruption,
_indexes[i].options.dupsAllowed,
dupRecords,
- dupKeysInserted);
+ (dupRecords) ? nullptr : &dupKeysInserted);
if (!status.isOK()) {
return status;
}
+
+ auto interceptor = entry->indexBuildInterceptor();
+ if (!interceptor || _ignoreUnique) {
+ continue;
+ }
+
+ // Record duplicate key insertions for later verification.
+ if (dupKeysInserted.size()) {
+ status = interceptor->recordDuplicateKeys(_opCtx, dupKeysInserted);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
}
_setState(State::kPreCommit);
@@ -624,10 +631,7 @@ Status MultiIndexBlockImpl::drainBackgroundWritesIfNeeded() {
LOG(1) << "draining background writes on collection " << _collection->ns()
<< " into index: " << _indexes[i].block->getEntry()->descriptor()->indexName();
- auto status = interceptor->drainWritesIntoIndex(_opCtx,
- _indexes[i].real,
- _indexes[i].block->getEntry()->descriptor(),
- _indexes[i].options);
+ auto status = interceptor->drainWritesIntoIndex(_opCtx, _indexes[i].options);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/catalog/multi_index_block_impl.h b/src/mongo/db/catalog/multi_index_block_impl.h
index 23fc2ab03b6..25902359202 100644
--- a/src/mongo/db/catalog/multi_index_block_impl.h
+++ b/src/mongo/db/catalog/multi_index_block_impl.h
@@ -78,13 +78,10 @@ public:
Status insertAllDocumentsInCollection() override;
- Status insert(const BSONObj& doc,
- const RecordId& loc,
- std::vector<BSONObj>* const dupKeysInserted = nullptr) override;
+ Status insert(const BSONObj& doc, const RecordId& loc) override;
Status dumpInsertsFromBulk() override;
Status dumpInsertsFromBulk(std::set<RecordId>* dupRecords) override;
- Status dumpInsertsFromBulk(std::vector<BSONObj>* dupKeysInserted) override;
/**
* See MultiIndexBlock::drainBackgroundWritesIfNeeded()
@@ -139,8 +136,7 @@ private:
InsertDeleteOptions options;
};
- Status _dumpInsertsFromBulk(std::set<RecordId>* dupRecords,
- std::vector<BSONObj>* dupKeysInserted);
+ Status _dumpInsertsFromBulk(std::set<RecordId>* dupRecords);
/**
* Returns the current state.
diff --git a/src/mongo/db/catalog/multi_index_block_test.cpp b/src/mongo/db/catalog/multi_index_block_test.cpp
index 562bdd518fb..1afec1af1fa 100644
--- a/src/mongo/db/catalog/multi_index_block_test.cpp
+++ b/src/mongo/db/catalog/multi_index_block_test.cpp
@@ -133,7 +133,7 @@ TEST_F(MultiIndexBlockTest, CommitAfterInsertingSingleDocument) {
ASSERT_EQUALS(0U, specs.size());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest());
- ASSERT_OK(indexer->insert({}, {}, nullptr));
+ ASSERT_OK(indexer->insert({}, {}));
ASSERT_OK(indexer->dumpInsertsFromBulk());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kPreCommit, indexer->getState_forTest());
@@ -154,7 +154,7 @@ TEST_F(MultiIndexBlockTest, AbortWithoutCleanupAfterInsertingSingleDocument) {
auto indexer = getIndexer();
auto specs = unittest::assertGet(indexer->init(std::vector<BSONObj>()));
ASSERT_EQUALS(0U, specs.size());
- ASSERT_OK(indexer->insert({}, {}, nullptr));
+ ASSERT_OK(indexer->insert({}, {}));
indexer->abortWithoutCleanup();
ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest());
@@ -186,13 +186,13 @@ TEST_F(MultiIndexBlockTest, InsertingSingleDocumentFailsAfterAbort) {
ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest());
ASSERT_EQUALS(ErrorCodes::IndexBuildAborted,
- indexer->insert(BSON("_id" << 123 << "a" << 456), {}, nullptr));
+ indexer->insert(BSON("_id" << 123 << "a" << 456), {}));
ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest());
ASSERT_FALSE(indexer->isCommitted());
}
-TEST_F(MultiIndexBlockTest, dumpInsertsFromBulkFailsAfterAbort) {
+TEST_F(MultiIndexBlockTest, DumpInsertsFromBulkFailsAfterAbort) {
auto indexer = getIndexer();
ASSERT_EQUALS(MultiIndexBlockImpl::State::kUninitialized, indexer->getState_forTest());
@@ -200,7 +200,7 @@ TEST_F(MultiIndexBlockTest, dumpInsertsFromBulkFailsAfterAbort) {
ASSERT_EQUALS(0U, specs.size());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest());
- ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {}, nullptr));
+ ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {}));
ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest());
indexer->abort("test"_sd);
@@ -220,7 +220,7 @@ TEST_F(MultiIndexBlockTest, CommitFailsAfterAbort) {
ASSERT_EQUALS(0U, specs.size());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest());
- ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {}, nullptr));
+ ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {}));
ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest());
ASSERT_OK(indexer->dumpInsertsFromBulk());
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript
index 87f54e29f47..0f8e825cb90 100644
--- a/src/mongo/db/index/SConscript
+++ b/src/mongo/db/index/SConscript
@@ -26,26 +26,10 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/storage/index_entry_comparison',
],
)
-env.CppUnitTest(
- target='duplicate_key_tracker_test',
- source=[
- 'duplicate_key_tracker_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/catalog/multi_index_block',
- '$BUILD_DIR/mongo/db/catalog_raii',
- '$BUILD_DIR/mongo/db/repl/replmocks',
- '$BUILD_DIR/mongo/db/service_context_d_test_fixture',
- '$BUILD_DIR/mongo/db/storage/ephemeral_for_test/storage_ephemeral_for_test_core',
- 'duplicate_key_tracker',
- ],
-)
env.Library(
target='key_generator',
source=[
@@ -154,6 +138,7 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ 'duplicate_key_tracker',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/multi_key_path_tracker',
diff --git a/src/mongo/db/index/duplicate_key_tracker.cpp b/src/mongo/db/index/duplicate_key_tracker.cpp
index 066ef969f05..7dd47ded1fe 100644
--- a/src/mongo/db/index/duplicate_key_tracker.cpp
+++ b/src/mongo/db/index/duplicate_key_tracker.cpp
@@ -34,12 +34,8 @@
#include "mongo/db/index/duplicate_key_tracker.h"
#include "mongo/db/catalog/index_catalog_entry.h"
-#include "mongo/db/catalog_raii.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/keypattern.h"
-#include "mongo/db/query/internal_plans.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -49,60 +45,63 @@ namespace {
static constexpr StringData kKeyField = "key"_sd;
}
-DuplicateKeyTracker::DuplicateKeyTracker(const IndexCatalogEntry* entry, const NamespaceString& nss)
- : _idCounter(0), _indexCatalogEntry(entry), _nss(nss) {
+DuplicateKeyTracker::DuplicateKeyTracker(OperationContext* opCtx, const IndexCatalogEntry* entry)
+ : _indexCatalogEntry(entry),
+ _keyConstraintsTable(
+ opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx)) {
invariant(_indexCatalogEntry->descriptor()->unique());
}
-DuplicateKeyTracker::~DuplicateKeyTracker() {}
-
-NamespaceString DuplicateKeyTracker::makeTempNamespace() {
- return NamespaceString("local.system.indexBuildConstraints-" + UUID::gen().toString());
-}
-
-Status DuplicateKeyTracker::recordDuplicates(OperationContext* opCtx,
- Collection* tempCollection,
- const std::vector<BSONObj>& keys) {
- invariant(tempCollection->ns() == nss());
- invariant(opCtx->lockState()->inAWriteUnitOfWork());
- invariant(opCtx->lockState()->isCollectionLockedForMode(tempCollection->ns().ns(), MODE_IX));
+Status DuplicateKeyTracker::recordKeys(OperationContext* opCtx, const std::vector<BSONObj>& keys) {
+ if (keys.size() == 0)
+ return Status::OK();
+ std::vector<BSONObj> toInsert;
+ toInsert.reserve(keys.size());
for (auto&& key : keys) {
BSONObjBuilder builder;
- builder.append("_id", _idCounter++);
builder.append(kKeyField, key);
BSONObj obj = builder.obj();
- LOG(2) << "Recording conflict for DuplicateKeyTracker: " << obj.toString();
- Status s = tempCollection->insertDocument(opCtx, InsertStatement(obj), nullptr, false);
- if (!s.isOK())
- return s;
+ toInsert.emplace_back(std::move(obj));
}
- return Status::OK();
-}
-Status DuplicateKeyTracker::constraintsSatisfiedForIndex(OperationContext* opCtx,
- Collection* tempCollection) const {
- invariant(tempCollection->ns() == nss());
- invariant(opCtx->lockState()->isCollectionLockedForMode(tempCollection->ns().ns(), MODE_IS));
- invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+ std::vector<Record> records;
+ records.reserve(keys.size());
+ for (auto&& obj : toInsert) {
+ records.emplace_back(Record{RecordId(), RecordData(obj.objdata(), obj.objsize())});
+ }
- auto collScan = InternalPlanner::collectionScan(
- opCtx, tempCollection->ns().ns(), tempCollection, PlanExecutor::YieldPolicy::YIELD_AUTO);
+ LOG(2) << "recording " << records.size() << " duplicate key conflicts for unique index: "
+ << _indexCatalogEntry->descriptor()->indexName();
- BSONObj conflict;
- PlanExecutor::ExecState state;
- while (PlanExecutor::ExecState::ADVANCED == (state = collScan->getNext(&conflict, nullptr))) {
+ WriteUnitOfWork wuow(opCtx);
+ std::vector<Timestamp> timestamps(records.size());
+ Status s = _keyConstraintsTable->rs()->insertRecords(opCtx, &records, timestamps);
+ if (!s.isOK())
+ return s;
- LOG(2) << "Resolving conflict for DuplicateKeyTracker: " << conflict.toString();
+ wuow.commit();
+ return Status::OK();
+}
+
+Status DuplicateKeyTracker::checkConstraints(OperationContext* opCtx) const {
+ auto constraintsCursor = _keyConstraintsTable->rs()->getCursor(opCtx);
+ auto record = constraintsCursor->next();
+
+ auto indexCursor =
+ _indexCatalogEntry->accessMethod()->getSortedDataInterface()->newCursor(opCtx);
+
+ int count = 0;
+ while (record) {
+ count++;
+ BSONObj conflict = record->data.toBson();
BSONObj keyObj = conflict[kKeyField].Obj();
- auto cursor =
- _indexCatalogEntry->accessMethod()->getSortedDataInterface()->newCursor(opCtx);
- auto entry = cursor->seekExact(keyObj);
+ auto entry = indexCursor->seekExact(keyObj);
// If there is not an exact match, there is no duplicate.
if (!entry) {
@@ -110,18 +109,19 @@ Status DuplicateKeyTracker::constraintsSatisfiedForIndex(OperationContext* opCtx
}
// If the following entry has the same key, this is a duplicate.
- entry = cursor->next();
+ entry = indexCursor->next();
if (entry && entry->key.woCompare(keyObj, BSONObj(), /*considerFieldNames*/ false) == 0) {
return buildDupKeyErrorStatus(keyObj,
_indexCatalogEntry->descriptor()->parentNS(),
_indexCatalogEntry->descriptor()->indexName(),
_indexCatalogEntry->descriptor()->keyPattern());
}
- }
- if (PlanExecutor::IS_EOF != state) {
- return WorkingSetCommon::getMemberObjectStatus(conflict);
+ record = constraintsCursor->next();
}
+
+ LOG(1) << "resolved " << count << " duplicate key conflicts for unique index: "
+ << _indexCatalogEntry->descriptor()->indexName();
return Status::OK();
}
diff --git a/src/mongo/db/index/duplicate_key_tracker.h b/src/mongo/db/index/duplicate_key_tracker.h
index f5ea6dbe584..8cbf6515145 100644
--- a/src/mongo/db/index/duplicate_key_tracker.h
+++ b/src/mongo/db/index/duplicate_key_tracker.h
@@ -34,56 +34,39 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/bsonobj.h"
-#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/index_catalog_entry.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/temporary_record_store.h"
namespace mongo {
/**
* Records keys that have violated duplicate key constraints on unique indexes. The keys are backed
- * by a temporary collection that the caller is responsible for creating, destroying, and holding
- * locks while passing into mutating functions.
+ * by a temporary table that is created and destroyed by this tracker.
*/
class DuplicateKeyTracker {
MONGO_DISALLOW_COPYING(DuplicateKeyTracker);
public:
- DuplicateKeyTracker(const IndexCatalogEntry* indexCatalogEntry, const NamespaceString& tempNss);
- ~DuplicateKeyTracker();
+ DuplicateKeyTracker(OperationContext* opCtx, const IndexCatalogEntry* indexCatalogEntry);
/**
- * Generates a unique namespace that should be used to construct the temporary backing
- * Collection for this tracker.
+ * Given a set of duplicate keys, insert them into the key constraint table.
*/
- static NamespaceString makeTempNamespace();
-
- /**
- * Given a set of duplicate keys, insert them into tempCollection.
- *
- * The caller must hold locks for 'tempCollection' and be in a WriteUnitOfWork.
- */
- Status recordDuplicates(OperationContext* opCtx,
- Collection* tempCollection,
- const std::vector<BSONObj>& keys);
+ Status recordKeys(OperationContext* opCtx, const std::vector<BSONObj>& keys);
/**
* Returns Status::OK if all previously recorded duplicate key constraint violations have been
* resolved for the index. Returns a DuplicateKey error if there are still duplicate key
* constraint violations on the index.
- *
- * The caller must hold locks for 'tempCollection'.
*/
- Status constraintsSatisfiedForIndex(OperationContext* opCtx, Collection* tempCollection) const;
-
- const NamespaceString& nss() const {
- return _nss;
- }
+ Status checkConstraints(OperationContext* opCtx) const;
private:
- std::int64_t _idCounter;
-
const IndexCatalogEntry* _indexCatalogEntry;
- const NamespaceString _nss;
+
+ // This temporary record store is owned by the duplicate key tracker and dropped along with it.
+ std::unique_ptr<TemporaryRecordStore> _keyConstraintsTable;
};
} // namespace mongo
diff --git a/src/mongo/db/index/duplicate_key_tracker_test.cpp b/src/mongo/db/index/duplicate_key_tracker_test.cpp
deleted file mode 100644
index b1abac80d95..00000000000
--- a/src/mongo/db/index/duplicate_key_tracker_test.cpp
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/catalog/collection_catalog_entry.h"
-#include "mongo/db/catalog/database_catalog_entry.h"
-#include "mongo/db/catalog/multi_index_block_impl.h"
-#include "mongo/db/catalog_raii.h"
-#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/curop.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/index/duplicate_key_tracker.h"
-#include "mongo/db/operation_context_noop.h"
-#include "mongo/db/repl/replication_coordinator_mock.h"
-#include "mongo/db/service_context_d_test_fixture.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-
-static const StringData kConstraintNsPrefix = "local.system.indexBuildConstraints"_sd;
-
-class DuplicateKeyTrackerTest : public ServiceContextMongoDTest {
-public:
- DuplicateKeyTrackerTest()
- : ServiceContextMongoDTest("ephemeralForTest"), _opCtx(cc().makeOperationContext()) {
- repl::ReplicationCoordinator::set(
- getServiceContext(),
- stdx::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext()));
- }
-
- std::unique_ptr<DuplicateKeyTracker> makeTracker(OperationContext* opCtx,
- const IndexCatalogEntry* entry) {
- NamespaceString tempNss = DuplicateKeyTracker::makeTempNamespace();
- makeTempCollection(opCtx, tempNss);
-
- auto tracker = std::make_unique<DuplicateKeyTracker>(entry, tempNss);
-
- AutoGetCollection autoColl(opCtx, tracker->nss(), MODE_IS);
- ASSERT(autoColl.getCollection());
-
- return tracker;
- };
-
- OperationContext* opCtx() {
- return _opCtx.get();
- }
-
- void tearDown() {}
-
- void makeTempCollection(OperationContext* opCtx, const NamespaceString& nss) {
- WriteUnitOfWork wuow(opCtx);
-
- AutoGetOrCreateDb autoDb(opCtx, nss.db(), MODE_X);
- auto db = autoDb.getDb();
- invariant(db);
-
- ASSERT(!db->getCollection(opCtx, nss));
-
- CollectionOptions options;
- options.setNoIdIndex();
-
- // Create the temp collection
- auto coll = db->createCollection(opCtx, nss.ns(), options);
- invariant(coll);
- wuow.commit();
- }
-
- void destroyTempCollection(OperationContext* opCtx, const NamespaceString& nss) {
- WriteUnitOfWork wuow(opCtx);
-
- AutoGetDb autoDb(opCtx, nss.db(), MODE_X);
- auto db = autoDb.getDb();
- invariant(db);
-
- ASSERT_OK(db->dropCollectionEvenIfSystem(opCtx, nss));
- wuow.commit();
- }
-
-private:
- int _numIndexesCreated = 0;
- ServiceContext::UniqueOperationContext _opCtx;
-};
-
-TEST_F(DuplicateKeyTrackerTest, IndexBuild) {
- std::unique_ptr<DuplicateKeyTracker> tracker;
-
- const NamespaceString collNs("test.myCollection");
- const BSONObj doc1 = BSON("_id" << 1 << "a" << 1);
- const BSONObj doc2 = BSON("_id" << 2 << "a" << 1);
-
- // Create the collection with a two documents that have the same key for 'a'.
- {
- AutoGetOrCreateDb dbRaii(opCtx(), collNs.db(), LockMode::MODE_X);
- WriteUnitOfWork wunit(opCtx());
- CollectionOptions options;
- options.uuid = UUID::gen();
- auto coll = dbRaii.getDb()->createCollection(opCtx(), collNs.ns(), options);
- ASSERT(coll);
-
- ASSERT_OK(coll->insertDocument(opCtx(), InsertStatement(doc1), nullptr));
- ASSERT_OK(coll->insertDocument(opCtx(), InsertStatement(doc2), nullptr));
- wunit.commit();
- }
-
- const std::string indexName = "a_1";
- const BSONObj spec =
- BSON("ns" << collNs.ns() << "v" << 2 << "name" << indexName << "key" << BSON("a" << 1)
- << "unique"
- << true
- << "background"
- << true);
-
- // Create the index build block. Insert two different documents, but each with the same value
- // for 'a'. This will cause the index insert to allow inserting a duplicate key.
- const IndexCatalogEntry* entry;
-
- boost::optional<Record> record1;
- boost::optional<Record> record2;
- {
- AutoGetCollection autoColl(opCtx(), collNs, MODE_X);
- auto coll = autoColl.getCollection();
-
- MultiIndexBlockImpl indexer(opCtx(), coll);
- // Don't use the bulk builder, which does not insert directly into the IAM for the index.
- indexer.allowBackgroundBuilding();
- // Allow duplicates.
- indexer.ignoreUniqueConstraint();
-
- ASSERT_OK(indexer.init(spec).getStatus());
-
- IndexDescriptor* desc = coll->getIndexCatalog()->findIndexByName(
- opCtx(), indexName, true /* includeUnfinished */);
- ASSERT(desc);
- entry = coll->getIndexCatalog()->getEntry(desc);
-
- // Construct the tracker.
- tracker = makeTracker(opCtx(), entry);
-
- // Index the documents.
- WriteUnitOfWork wunit(opCtx());
- auto cursor = coll->getCursor(opCtx());
-
- record1 = cursor->next();
- ASSERT(record1);
-
- std::vector<BSONObj> dupsInserted;
-
- // The insert of the first document should return no duplicates.
- ASSERT_OK(indexer.insert(record1->data.releaseToBson(), record1->id, &dupsInserted));
- ASSERT_EQ(0u, dupsInserted.size());
-
- // The insert of the second document should return that a duplicate key was inserted.
- record2 = cursor->next();
- ASSERT(record2);
- ASSERT_OK(indexer.insert(record2->data.releaseToBson(), record2->id, &dupsInserted));
- ASSERT_EQ(1u, dupsInserted.size());
-
- // Record that duplicates were inserted.
- AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IX);
- ASSERT_OK(tracker->recordDuplicates(opCtx(), tempColl.getCollection(), dupsInserted));
-
- ASSERT_OK(indexer.commit());
- wunit.commit();
- }
-
- // Confirm that the keys + RecordId of the duplicate are recorded.
- {
- AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IS);
- Status s = tracker->constraintsSatisfiedForIndex(opCtx(), tempColl.getCollection());
- ASSERT_EQ(ErrorCodes::DuplicateKey, s.code());
- }
-
- // Now remove the document and index key to confirm the conflicts are resolved.
- {
- AutoGetCollection autoColl(opCtx(), collNs, MODE_IX);
- auto coll = autoColl.getCollection();
-
- WriteUnitOfWork wunit(opCtx());
-
- OpDebug opDebug;
- coll->deleteDocument(opCtx(), kUninitializedStmtId, record2->id, &opDebug);
- wunit.commit();
-
- // One key deleted for each index (includes _id)
- ASSERT_EQ(2u, *opDebug.additiveMetrics.keysDeleted);
-
- AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IS);
- Status s = tracker->constraintsSatisfiedForIndex(opCtx(), tempColl.getCollection());
- ASSERT_OK(s);
- }
-
- destroyTempCollection(opCtx(), tracker->nss());
-}
-
-TEST_F(DuplicateKeyTrackerTest, BulkIndexBuild) {
- std::unique_ptr<DuplicateKeyTracker> tracker;
-
- const NamespaceString collNs("test.myCollection");
- const BSONObj doc1 = BSON("_id" << 1 << "a" << 1);
- const BSONObj doc2 = BSON("_id" << 2 << "a" << 1);
-
- // Create the collection with a two documents that have the same key for 'a'.
- {
- AutoGetOrCreateDb dbRaii(opCtx(), collNs.db(), LockMode::MODE_X);
- WriteUnitOfWork wunit(opCtx());
- CollectionOptions options;
- options.uuid = UUID::gen();
- auto coll = dbRaii.getDb()->createCollection(opCtx(), collNs.ns(), options);
- ASSERT(coll);
-
- ASSERT_OK(coll->insertDocument(opCtx(), InsertStatement(doc1), nullptr));
- ASSERT_OK(coll->insertDocument(opCtx(), InsertStatement(doc2), nullptr));
- wunit.commit();
- }
-
- const std::string indexName = "a_1";
- const BSONObj spec =
- BSON("ns" << collNs.ns() << "v" << 2 << "name" << indexName << "key" << BSON("a" << 1)
- << "unique"
- << true
- << "background"
- << true);
-
- // Create the bulk build block. Insert two different documents, but each with the same value
- // for 'a'. This will cause the index insert to allow inserting a duplicate key.
- const IndexCatalogEntry* entry;
-
- boost::optional<Record> record1;
- boost::optional<Record> record2;
- {
- AutoGetCollection autoColl(opCtx(), collNs, MODE_X);
- auto coll = autoColl.getCollection();
-
- MultiIndexBlockImpl indexer(opCtx(), coll);
- // Allow duplicates.
- indexer.ignoreUniqueConstraint();
-
- ASSERT_OK(indexer.init(spec).getStatus());
-
- IndexDescriptor* desc = coll->getIndexCatalog()->findIndexByName(
- opCtx(), indexName, true /* includeUnfinished */);
- entry = coll->getIndexCatalog()->getEntry(desc);
-
- // Construct the tracker.
- tracker = makeTracker(opCtx(), entry);
-
- auto cursor = coll->getCursor(opCtx());
-
- record1 = cursor->next();
- ASSERT(record1);
-
- std::vector<BSONObj> dupsInserted;
-
- // Neither of these inserts will recognize duplicates because the bulk inserter does not
- // detect them until dumpInsertsFromBulk() is called.
- ASSERT_OK(indexer.insert(record1->data.releaseToBson(), record1->id, &dupsInserted));
- ASSERT_EQ(0u, dupsInserted.size());
-
- record2 = cursor->next();
- ASSERT(record2);
- ASSERT_OK(indexer.insert(record2->data.releaseToBson(), record2->id, &dupsInserted));
- ASSERT_EQ(0u, dupsInserted.size());
-
- ASSERT_OK(indexer.dumpInsertsFromBulk(&dupsInserted));
- ASSERT_EQ(1u, dupsInserted.size());
-
- // Record that duplicates were inserted.
- WriteUnitOfWork wunit(opCtx());
-
- AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IX);
- ASSERT_OK(tracker->recordDuplicates(opCtx(), tempColl.getCollection(), dupsInserted));
-
- ASSERT_OK(indexer.commit());
- wunit.commit();
- }
-
- // Confirm that the keys + RecordId of the duplicate are recorded.
- {
- AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IS);
- Status s = tracker->constraintsSatisfiedForIndex(opCtx(), tempColl.getCollection());
- ASSERT_EQ(ErrorCodes::DuplicateKey, s.code());
- }
-
- // Now remove the document and index key to confirm the conflicts are resolved.
- {
- AutoGetCollection autoColl(opCtx(), collNs, MODE_IX);
- auto coll = autoColl.getCollection();
-
- WriteUnitOfWork wunit(opCtx());
-
- OpDebug opDebug;
- coll->deleteDocument(opCtx(), kUninitializedStmtId, record2->id, &opDebug);
- wunit.commit();
-
- // One key deleted for each index (includes _id)
- ASSERT_EQ(2u, *opDebug.additiveMetrics.keysDeleted);
-
- AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IS);
- Status s = tracker->constraintsSatisfiedForIndex(opCtx(), tempColl.getCollection());
- ASSERT_OK(s);
- }
-
- destroyTempCollection(opCtx(), tracker->nss());
-}
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp
index 6b74def0c44..b2975218c6e 100644
--- a/src/mongo/db/index/index_build_interceptor.cpp
+++ b/src/mongo/db/index/index_build_interceptor.cpp
@@ -47,13 +47,30 @@
namespace mongo {
-IndexBuildInterceptor::IndexBuildInterceptor(OperationContext* opCtx)
- : _sideWritesTable(
- opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx)){};
+IndexBuildInterceptor::IndexBuildInterceptor(OperationContext* opCtx, IndexCatalogEntry* entry)
+ : _indexCatalogEntry(entry),
+ _sideWritesTable(
+ opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx)) {
+
+ if (entry->descriptor()->unique()) {
+ _duplicateKeyTracker = std::make_unique<DuplicateKeyTracker>(opCtx, entry);
+ }
+}
+
+Status IndexBuildInterceptor::recordDuplicateKeys(OperationContext* opCtx,
+ const std::vector<BSONObj>& keys) {
+ invariant(_indexCatalogEntry->descriptor()->unique());
+ return _duplicateKeyTracker->recordKeys(opCtx, keys);
+}
+
+Status IndexBuildInterceptor::checkDuplicateKeyConstraints(OperationContext* opCtx) const {
+ if (!_duplicateKeyTracker) {
+ return Status::OK();
+ }
+ return _duplicateKeyTracker->checkConstraints(opCtx);
+}
Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx,
- IndexAccessMethod* indexAccessMethod,
- const IndexDescriptor* indexDescriptor,
const InsertDeleteOptions& options) {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
@@ -136,8 +153,8 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx,
// writes table.
WriteUnitOfWork wuow(opCtx);
for (auto& operation : batch) {
- auto status = _applyWrite(
- opCtx, indexAccessMethod, operation.second, options, &totalInserted, &totalDeleted);
+ auto status =
+ _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted);
if (!status.isOK()) {
return status;
}
@@ -159,15 +176,15 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx,
progress->finished();
- log() << "index build for " << indexDescriptor->indexName() << ": drain applied "
- << (_numApplied - appliedAtStart) << " side writes. i: " << totalInserted
- << ", d: " << totalDeleted << ", total: " << _numApplied;
+ log() << "index build for " << _indexCatalogEntry->descriptor()->indexName()
+ << ": drain applied " << (_numApplied - appliedAtStart)
+ << " side writes. i: " << totalInserted << ", d: " << totalDeleted
+ << ", total: " << _numApplied;
return Status::OK();
}
Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx,
- IndexAccessMethod* indexAccessMethod,
const BSONObj& operation,
const InsertDeleteOptions& options,
int64_t* const keysInserted,
@@ -178,28 +195,34 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx,
(strcmp(operation.getStringField("op"), "i") == 0) ? Op::kInsert : Op::kDelete;
const BSONObjSet keySet = SimpleBSONObjComparator::kInstance.makeBSONObjSet({key});
+ auto accessMethod = _indexCatalogEntry->accessMethod();
if (opType == Op::kInsert) {
InsertResult result;
- Status s =
- indexAccessMethod->insertKeys(opCtx,
- keySet,
- SimpleBSONObjComparator::kInstance.makeBSONObjSet(),
- MultikeyPaths{},
- opRecordId,
- options,
- &result);
- if (!s.isOK()) {
- return s;
+ auto status = accessMethod->insertKeys(opCtx,
+ keySet,
+ SimpleBSONObjComparator::kInstance.makeBSONObjSet(),
+ MultikeyPaths{},
+ opRecordId,
+ options,
+ &result);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ if (result.dupsInserted.size()) {
+ status = recordDuplicateKeys(opCtx, result.dupsInserted);
+ if (!status.isOK()) {
+ return status;
+ }
}
- invariant(!result.dupsInserted.size());
*keysInserted += result.numInserted;
} else {
invariant(opType == Op::kDelete);
DEV invariant(strcmp(operation.getStringField("op"), "d") == 0);
int64_t numDeleted;
- Status s = indexAccessMethod->removeKeys(opCtx, keySet, opRecordId, options, &numDeleted);
+ Status s = accessMethod->removeKeys(opCtx, keySet, opRecordId, options, &numDeleted);
if (!s.isOK()) {
return s;
}
@@ -298,7 +321,9 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
std::vector<Record> records;
for (auto& doc : toInsert) {
- records.emplace_back(Record{RecordId(), RecordData(doc.objdata(), doc.objsize())});
+ records.emplace_back(Record{RecordId(), // The storage engine will assign its own RecordId
+ // when we pass one that is null.
+ RecordData(doc.objdata(), doc.objsize())});
}
// By passing a vector of null timestamps, these inserts are not timestamped individually, but
@@ -306,4 +331,5 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
std::vector<Timestamp> timestamps(records.size());
return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps);
}
+
} // namespace mongo
diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h
index b25fbf53c71..6988395906c 100644
--- a/src/mongo/db/index/index_build_interceptor.h
+++ b/src/mongo/db/index/index_build_interceptor.h
@@ -29,6 +29,7 @@
#pragma once
+#include "mongo/db/index/duplicate_key_tracker.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/multikey_paths.h"
#include "mongo/db/namespace_string.h"
@@ -50,7 +51,7 @@ public:
* intercept side writes. This interceptor must not exist longer than the operation context used
* to construct it, as the underlying TemporaryRecordStore needs it to destroy itself.
*/
- IndexBuildInterceptor(OperationContext* opCtx);
+ IndexBuildInterceptor(OperationContext* opCtx, IndexCatalogEntry* entry);
/**
* Client writes that are concurrent with an index build will have their index updates written
@@ -67,6 +68,19 @@ public:
int64_t* const numKeysOut);
/**
+ * Given a set of duplicate keys, record the keys for later verification by a call to
+ * checkConstraints();
+ */
+ Status recordDuplicateKeys(OperationContext* opCtx, const std::vector<BSONObj>& keys);
+
+ /**
+ * Returns Status::OK if all previously recorded duplicate key constraint violations have been
+ * resolved for the index. Returns a DuplicateKey error if there are still duplicate key
+ * constraint violations on the index.
+ */
+ Status checkDuplicateKeyConstraints(OperationContext* opCtx) const;
+
+ /**
* Performs a resumable scan on the side writes table, and either inserts or removes each key
* from the underlying IndexAccessMethod. This will only insert as many records as are visible
* in the current snapshot.
@@ -74,10 +88,8 @@ public:
* This is resumable, so subsequent calls will start the scan at the record immediately
* following the last inserted record from a previous call to drainWritesIntoIndex.
*/
- Status drainWritesIntoIndex(OperationContext* opCtx,
- IndexAccessMethod* indexAccessMethod,
- const IndexDescriptor* indexDescriptor,
- const InsertDeleteOptions& options);
+ Status drainWritesIntoIndex(OperationContext* opCtx, const InsertDeleteOptions& options);
+
/**
* Returns 'true' if there are no visible records remaining to be applied from the side writes
@@ -95,15 +107,19 @@ private:
using SideWriteRecord = std::pair<RecordId, BSONObj>;
Status _applyWrite(OperationContext* opCtx,
- IndexAccessMethod* indexAccessMethod,
const BSONObj& doc,
const InsertDeleteOptions& options,
int64_t* const keysInserted,
int64_t* const keysDeleted);
+ // The entry for the index that is being built.
+ IndexCatalogEntry* _indexCatalogEntry;
+
// This temporary record store is owned by the interceptor and dropped along with it.
std::unique_ptr<TemporaryRecordStore> _sideWritesTable;
+ std::unique_ptr<DuplicateKeyTracker> _duplicateKeyTracker;
+
int64_t _numApplied{0};
AtomicInt64 _sideWritesCounter{0};
diff --git a/src/mongo/dbtests/indexupdatetests.cpp b/src/mongo/dbtests/indexupdatetests.cpp
index 9c44428ef6d..0244ff0122e 100644
--- a/src/mongo/dbtests/indexupdatetests.cpp
+++ b/src/mongo/dbtests/indexupdatetests.cpp
@@ -206,7 +206,17 @@ public:
ASSERT_OK(indexer.init(spec).getStatus());
const Status status = indexer.insertAllDocumentsInCollection();
- ASSERT_EQUALS(status.code(), ErrorCodes::DuplicateKey);
+ if (!background) {
+ ASSERT_EQUALS(status.code(), ErrorCodes::DuplicateKey);
+ return;
+ }
+
+ // Background builds do not detect duplicates until they commit.
+ ASSERT_OK(status);
+
+ WriteUnitOfWork wunit(&_opCtx);
+ ASSERT_THROWS_CODE(indexer.commit(), AssertionException, ErrorCodes::DuplicateKey);
+ wunit.commit();
}
};