diff options
-rw-r--r-- | jstests/noPassthrough/hybrid_unique_index_with_updates.js | 157 | ||||
-rw-r--r-- | jstests/noPassthrough/indexbg2.js | 38 | ||||
-rw-r--r-- | src/mongo/db/catalog/index_build_block.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/catalog/multi_index_block.h | 15 | ||||
-rw-r--r-- | src/mongo/db/catalog/multi_index_block_impl.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/catalog/multi_index_block_impl.h | 8 | ||||
-rw-r--r-- | src/mongo/db/catalog/multi_index_block_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/index/SConscript | 17 | ||||
-rw-r--r-- | src/mongo/db/index/duplicate_key_tracker.cpp | 86 | ||||
-rw-r--r-- | src/mongo/db/index/duplicate_key_tracker.h | 37 | ||||
-rw-r--r-- | src/mongo/db/index/duplicate_key_tracker_test.cpp | 334 | ||||
-rw-r--r-- | src/mongo/db/index/index_build_interceptor.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/index/index_build_interceptor.h | 28 | ||||
-rw-r--r-- | src/mongo/dbtests/indexupdatetests.cpp | 12 |
14 files changed, 358 insertions, 533 deletions
diff --git a/jstests/noPassthrough/hybrid_unique_index_with_updates.js b/jstests/noPassthrough/hybrid_unique_index_with_updates.js new file mode 100644 index 00000000000..b3dc94ca4fd --- /dev/null +++ b/jstests/noPassthrough/hybrid_unique_index_with_updates.js @@ -0,0 +1,157 @@ +/** + * Tests that write operations are accepted and result in correct indexing behavior for each phase + * of hybrid unique index builds. This test inserts a duplicate document at different phases of an + * index build to confirm that the resulting behavior is failure. + * + * @tags: [requires_document_locking] + */ +(function() { + "use strict"; + + load("jstests/libs/check_log.js"); + + let conn = MongoRunner.runMongod(); + let testDB = conn.getDB('test'); + + // Run 'func' while failpoint is enabled. + let doDuringFailpoint = function(failPointName, logMessage, func, i) { + clearRawMongoProgramOutput(); + assert.commandWorked(testDB.adminCommand( + {configureFailPoint: failPointName, mode: "alwaysOn", data: {"i": i}})); + + assert.soon(() => rawMongoProgramOutput().indexOf(logMessage) >= 0); + + func(); + + assert.commandWorked(testDB.adminCommand({configureFailPoint: failPointName, mode: "off"})); + }; + + const docsToInsert = 1000; + let setUp = function(coll) { + coll.drop(); + + let bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < docsToInsert; i++) { + bulk.insert({i: i}); + } + assert.commandWorked(bulk.execute()); + }; + + let buildIndexInBackground = function(expectDuplicateKeyError) { + if (expectDuplicateKeyError) { + return startParallelShell(function() { + assert.commandFailedWithCode( + db.hybrid.createIndex({i: 1}, {background: true, unique: true}), + ErrorCodes.DuplicateKey); + }, conn.port); + } + return startParallelShell(function() { + assert.commandWorked(db.hybrid.createIndex({i: 1}, {background: true, unique: true})); + }, conn.port); + }; + + /** + * Run a background index build on a unique index under different configurations. Introduce + * duplicate keys on the index that may cause it to fail or succeed, depending on the following + * optional parmeters: + * { + * // Which operation used to introduce a duplicate key. + * operation {string}: "insert", "update" + * + * // Whether or not resolve the duplicate key before completing the build. + * resolve {bool} + * + * // Which phase of the index build to introduce the duplicate key. + * phase {number}: 0-4 + * } + */ + let runTest = function(config) { + jsTestLog("running test with config: " + tojson(config)); + + setUp(testDB.hybrid); + + // Expect the build to fail with a duplicate key error if we insert a duplicate key and + // don't resolve it. + let expectDuplicate = config.resolve === false; + let awaitBuild = buildIndexInBackground(expectDuplicate); + + // Introduce a duplicate key, either from an insert or update. Optionally, follow-up with an + // operation that will resolve the duplicate by removing it or updating it. + const dup = {i: 0}; + let doOperation = function() { + if ("insert" == config.operation) { + assert.commandWorked(testDB.hybrid.insert(dup)); + if (config.resolve) { + assert.commandWorked(testDB.hybrid.deleteOne(dup)); + } + } else if ("update" == config.operation) { + assert.commandWorked(testDB.hybrid.update(dup, {i: 1})); + if (config.resolve) { + assert.commandWorked(testDB.hybrid.update({i: 1}, dup)); + } + } + }; + + const stopKey = 0; + switch (config.phase) { + // Don't hang the build. + case undefined: + break; + // Hang before scanning the first document. + case 0: + doDuringFailpoint("hangBeforeIndexBuildOf", + "Hanging before index build of i=" + stopKey, + doOperation, + stopKey); + break; + // Hang after scanning the first document. + case 1: + doDuringFailpoint("hangAfterIndexBuildOf", + "Hanging after index build of i=" + stopKey, + doOperation, + stopKey); + break; + // Hang before the first drain and after dumping the keys from the external sorter into + // the index. + case 2: + doDuringFailpoint("hangAfterIndexBuildDumpsInsertsFromBulk", + "Hanging after dumping inserts from bulk builder", + doOperation); + break; + // Hang before the second drain. + case 3: + doDuringFailpoint("hangAfterIndexBuildFirstDrain", + "Hanging after index build first drain", + doOperation); + break; + // Hang before the final drain and commit. + case 4: + doDuringFailpoint("hangAfterIndexBuildSecondDrain", + "Hanging after index build second drain", + doOperation); + break; + default: + assert(false, "Invalid phase: " + config.phase); + } + + awaitBuild(); + + let expectedDocs = docsToInsert; + expectedDocs += (config.operation == "insert" && config.resolve === false) ? 1 : 0; + + assert.eq(expectedDocs, testDB.hybrid.count()); + assert.eq(expectedDocs, testDB.hybrid.find().itcount()); + assert.commandWorked(testDB.hybrid.validate({full: true})); + }; + + runTest({}); + + for (let i = 0; i <= 4; i++) { + runTest({operation: "insert", resolve: true, phase: i}); + runTest({operation: "insert", resolve: false, phase: i}); + runTest({operation: "update", resolve: true, phase: i}); + runTest({operation: "update", resolve: false, phase: i}); + } + + MongoRunner.stopMongod(conn); +})(); diff --git a/jstests/noPassthrough/indexbg2.js b/jstests/noPassthrough/indexbg2.js index 7b16e143807..913fac1d1a2 100644 --- a/jstests/noPassthrough/indexbg2.js +++ b/jstests/noPassthrough/indexbg2.js @@ -96,11 +96,9 @@ }; // Unique background index build succeeds: - // 1) when a document is inserted with a key that has already been indexed - // (with the insert failing on duplicate key error). - // 2) when a document with a key not present in the initial set is inserted twice - // (with the initial insert succeeding and the second failing on duplicate key error). - let succeedWithWriteErrors = function(coll, newKey) { + // 1) when a document is inserted and removed with a key that has already been indexed + // 2) when a document with a key not present in the initial set is inserted and removed + let succeedWithoutWriteErrors = function(coll, newKey) { let duplicateKey = 3; turnFailPointOn("hangAfterIndexBuildOf", duplicateKey); @@ -112,32 +110,16 @@ jsTestLog("Waiting to hang after index build of i=" + duplicateKey); checkLog.contains(conn, "Hanging after index build of i=" + duplicateKey); - assert.writeError(coll.save({i: duplicateKey, n: true})); + assert.commandWorked(coll.insert({i: duplicateKey, n: true})); - // First insert on key not present in initial set - assert.writeOK(coll.save({i: newKey, n: true})); - } catch (e) { - turnFailPointOff("hangAfterIndexBuildOf"); - throw e; - } + // First insert on key not present in initial set. + assert.commandWorked(coll.insert({i: newKey, n: true})); - try { - // We are currently hanging after indexing document with {i: duplicateKey}. - // To perform next check, we need to hang after indexing document with {i: newKey}. - // Add a hang before indexing document {i: newKey}, then turn off current hang - // so we are always in a known state and don't skip over the indexing of {i: newKey}. - turnFailPointOn("hangBeforeIndexBuildOf", newKey); - turnFailPointOff("hangAfterIndexBuildOf"); - turnFailPointOn("hangAfterIndexBuildOf", newKey); - turnFailPointOff("hangBeforeIndexBuildOf"); - - // Second insert on key not present in intial set fails with duplicate key error - jsTestLog("Waiting to hang after index build of i=" + newKey); - checkLog.contains(conn, "Hanging after index build of i=" + newKey); + // Remove duplicates before completing the index build. + assert.commandWorked(coll.deleteOne({i: duplicateKey, n: true})); + assert.commandWorked(coll.deleteOne({i: newKey, n: true})); - assert.writeError(coll.save({i: newKey, n: true})); } finally { - turnFailPointOff("hangBeforeIndexBuildOf"); turnFailPointOff("hangAfterIndexBuildOf"); } @@ -164,7 +146,7 @@ failOnInsertedDuplicateValue(coll); assert.eq(size, coll.count()); - succeedWithWriteErrors(coll, size); + succeedWithoutWriteErrors(coll, size); waitParallel(); }; diff --git a/src/mongo/db/catalog/index_build_block.cpp b/src/mongo/db/catalog/index_build_block.cpp index 5f8ef4e21d3..16d76b1de28 100644 --- a/src/mongo/db/catalog/index_build_block.cpp +++ b/src/mongo/db/catalog/index_build_block.cpp @@ -91,10 +91,10 @@ Status IndexCatalogImpl::IndexBuildBlock::init() { _opCtx, std::move(descriptor), initFromDisk, isReadyIndex); // Hybrid indexes are only enabled for background, non-unique indexes. - // TODO: Remove when SERVER-38036 and SERVER-37270 are complete. - const bool useHybrid = isBackgroundIndex && !descriptorPtr->unique(); + // TODO: Remove when SERVER-37270 is complete. + const bool useHybrid = isBackgroundIndex; if (useHybrid) { - _indexBuildInterceptor = stdx::make_unique<IndexBuildInterceptor>(_opCtx); + _indexBuildInterceptor = stdx::make_unique<IndexBuildInterceptor>(_opCtx, _entry); _entry->setIndexBuildInterceptor(_indexBuildInterceptor.get()); _opCtx->recoveryUnit()->onCommit( @@ -146,8 +146,15 @@ void IndexCatalogImpl::IndexBuildBlock::success() { NamespaceString ns(_indexNamespace); invariant(_opCtx->lockState()->isDbLockedForMode(ns.db(), MODE_X)); - // An index build should never be completed with writes remaining in the interceptor. - invariant(!_indexBuildInterceptor || _indexBuildInterceptor->areAllWritesApplied(_opCtx)); + if (_indexBuildInterceptor) { + // An index build should never be completed with writes remaining in the interceptor. + invariant(_indexBuildInterceptor->areAllWritesApplied(_opCtx)); + + // Hybrid indexes must check for any outstanding duplicate key constraint violations when + // they finish. + uassertStatusOK(_indexBuildInterceptor->checkDuplicateKeyConstraints(_opCtx)); + } + LOG(2) << "marking index " << _indexName << " as ready in snapshot id " << _opCtx->recoveryUnit()->getSnapshotId(); diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h index 70283a83fa5..b9c4e7a5e65 100644 --- a/src/mongo/db/catalog/multi_index_block.h +++ b/src/mongo/db/catalog/multi_index_block.h @@ -86,7 +86,8 @@ public: * If this is called before init(), we will ignore unique violations. This has no effect if * no specs are unique. * - * If this is called, any dupsOut sets passed in will never be filled. + * If this is called, any 'dupRecords' set passed to dumpInsertsFromBulk() will never be + * filled. */ virtual void ignoreUniqueConstraint() = 0; @@ -123,16 +124,13 @@ public: virtual Status insertAllDocumentsInCollection() = 0; /** - * Call this after init() for each document in the collection. Any duplicate keys inserted will - * be appended to 'dupKeysInserted' if it is not null. + * Call this after init() for each document in the collection. * * Do not call if you called insertAllDocumentsInCollection(); * * Should be called inside of a WriteUnitOfWork. */ - virtual Status insert(const BSONObj& wholeDocument, - const RecordId& loc, - std::vector<BSONObj>* const dupKeysInserted = nullptr) = 0; + virtual Status insert(const BSONObj& wholeDocument, const RecordId& loc) = 0; /** * Call this after the last insert(). This gives the index builder a chance to do any @@ -145,15 +143,10 @@ public: * indexed, so callers MUST either fail this index build or delete the documents from the * collection. * - * If 'dupKeysInserted' is passed as non-NULL and duplicates are allowed for the unique index, - * violators of uniqueness constraints will still be indexed, and the keys will be appended to - * the vector. No DuplicateKey errors will be returned. - * * Should not be called inside of a WriteUnitOfWork. */ virtual Status dumpInsertsFromBulk() = 0; virtual Status dumpInsertsFromBulk(std::set<RecordId>* const dupRecords) = 0; - virtual Status dumpInsertsFromBulk(std::vector<BSONObj>* const dupKeysInserted) = 0; /** * For background indexes using an IndexBuildInterceptor to capture inserts during a build, diff --git a/src/mongo/db/catalog/multi_index_block_impl.cpp b/src/mongo/db/catalog/multi_index_block_impl.cpp index 586ab43cb89..dd32d49bd43 100644 --- a/src/mongo/db/catalog/multi_index_block_impl.cpp +++ b/src/mongo/db/catalog/multi_index_block_impl.cpp @@ -302,11 +302,14 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlockImpl::init(const std::vector<BSO _collection->getIndexCatalog()->prepareInsertDeleteOptions( _opCtx, descriptor, &index.options); - index.options.dupsAllowed = index.options.dupsAllowed || _ignoreUnique; - index.options.fromIndexBuilder = true; + // Allow duplicates when explicitly allowed or an interceptor is installed, which will + // perform duplicate checking itself. + index.options.dupsAllowed |= + _ignoreUnique || index.block->getEntry()->indexBuildInterceptor(); if (_ignoreUnique) { index.options.getKeysMode = IndexAccessMethod::GetKeysMode::kRelaxConstraints; } + index.options.fromIndexBuilder = true; log() << "build index on: " << ns << " properties: " << descriptor->toString(); if (index.bulk) @@ -514,9 +517,7 @@ Status MultiIndexBlockImpl::insertAllDocumentsInCollection() { return Status::OK(); } -Status MultiIndexBlockImpl::insert(const BSONObj& doc, - const RecordId& loc, - std::vector<BSONObj>* const dupKeysInserted) { +Status MultiIndexBlockImpl::insert(const BSONObj& doc, const RecordId& loc) { if (State::kAborted == _getState()) { return {ErrorCodes::IndexBuildAborted, str::stream() << "Index build aborted: " << _abortReason @@ -543,29 +544,15 @@ Status MultiIndexBlockImpl::insert(const BSONObj& doc, if (!idxStatus.isOK()) return idxStatus; - - if (dupKeysInserted) { - dupKeysInserted->insert( - dupKeysInserted->end(), result.dupsInserted.begin(), result.dupsInserted.end()); - } } return Status::OK(); } Status MultiIndexBlockImpl::dumpInsertsFromBulk() { - return _dumpInsertsFromBulk(nullptr, nullptr); + return dumpInsertsFromBulk(nullptr); } Status MultiIndexBlockImpl::dumpInsertsFromBulk(std::set<RecordId>* dupRecords) { - return _dumpInsertsFromBulk(dupRecords, nullptr); -} - -Status MultiIndexBlockImpl::dumpInsertsFromBulk(std::vector<BSONObj>* dupKeysInserted) { - return _dumpInsertsFromBulk(nullptr, dupKeysInserted); -} - -Status MultiIndexBlockImpl::_dumpInsertsFromBulk(std::set<RecordId>* dupRecords, - std::vector<BSONObj>* dupKeysInserted) { if (State::kAborted == _getState()) { return {ErrorCodes::IndexBuildAborted, str::stream() << "Index build aborted: " << _abortReason @@ -580,17 +567,37 @@ Status MultiIndexBlockImpl::_dumpInsertsFromBulk(std::set<RecordId>* dupRecords, for (size_t i = 0; i < _indexes.size(); i++) { if (_indexes[i].bulk == NULL) continue; + + // If 'dupRecords' is provided, it will be used to store all records that would result in + // duplicate key errors. Only pass 'dupKeysInserted', which stores inserted duplicate keys, + // when 'dupRecords' is not used because these two vectors are mutually incompatible. + std::vector<BSONObj> dupKeysInserted; + + IndexCatalogEntry* entry = _indexes[i].block->getEntry(); LOG(1) << "\t dumping from external sorter into index: " - << _indexes[i].block->getEntry()->descriptor()->indexName(); + << entry->descriptor()->indexName(); Status status = _indexes[i].real->commitBulk(_opCtx, _indexes[i].bulk.get(), _allowInterruption, _indexes[i].options.dupsAllowed, dupRecords, - dupKeysInserted); + (dupRecords) ? nullptr : &dupKeysInserted); if (!status.isOK()) { return status; } + + auto interceptor = entry->indexBuildInterceptor(); + if (!interceptor || _ignoreUnique) { + continue; + } + + // Record duplicate key insertions for later verification. + if (dupKeysInserted.size()) { + status = interceptor->recordDuplicateKeys(_opCtx, dupKeysInserted); + if (!status.isOK()) { + return status; + } + } } _setState(State::kPreCommit); @@ -624,10 +631,7 @@ Status MultiIndexBlockImpl::drainBackgroundWritesIfNeeded() { LOG(1) << "draining background writes on collection " << _collection->ns() << " into index: " << _indexes[i].block->getEntry()->descriptor()->indexName(); - auto status = interceptor->drainWritesIntoIndex(_opCtx, - _indexes[i].real, - _indexes[i].block->getEntry()->descriptor(), - _indexes[i].options); + auto status = interceptor->drainWritesIntoIndex(_opCtx, _indexes[i].options); if (!status.isOK()) { return status; } diff --git a/src/mongo/db/catalog/multi_index_block_impl.h b/src/mongo/db/catalog/multi_index_block_impl.h index 23fc2ab03b6..25902359202 100644 --- a/src/mongo/db/catalog/multi_index_block_impl.h +++ b/src/mongo/db/catalog/multi_index_block_impl.h @@ -78,13 +78,10 @@ public: Status insertAllDocumentsInCollection() override; - Status insert(const BSONObj& doc, - const RecordId& loc, - std::vector<BSONObj>* const dupKeysInserted = nullptr) override; + Status insert(const BSONObj& doc, const RecordId& loc) override; Status dumpInsertsFromBulk() override; Status dumpInsertsFromBulk(std::set<RecordId>* dupRecords) override; - Status dumpInsertsFromBulk(std::vector<BSONObj>* dupKeysInserted) override; /** * See MultiIndexBlock::drainBackgroundWritesIfNeeded() @@ -139,8 +136,7 @@ private: InsertDeleteOptions options; }; - Status _dumpInsertsFromBulk(std::set<RecordId>* dupRecords, - std::vector<BSONObj>* dupKeysInserted); + Status _dumpInsertsFromBulk(std::set<RecordId>* dupRecords); /** * Returns the current state. diff --git a/src/mongo/db/catalog/multi_index_block_test.cpp b/src/mongo/db/catalog/multi_index_block_test.cpp index 562bdd518fb..1afec1af1fa 100644 --- a/src/mongo/db/catalog/multi_index_block_test.cpp +++ b/src/mongo/db/catalog/multi_index_block_test.cpp @@ -133,7 +133,7 @@ TEST_F(MultiIndexBlockTest, CommitAfterInsertingSingleDocument) { ASSERT_EQUALS(0U, specs.size()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest()); - ASSERT_OK(indexer->insert({}, {}, nullptr)); + ASSERT_OK(indexer->insert({}, {})); ASSERT_OK(indexer->dumpInsertsFromBulk()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kPreCommit, indexer->getState_forTest()); @@ -154,7 +154,7 @@ TEST_F(MultiIndexBlockTest, AbortWithoutCleanupAfterInsertingSingleDocument) { auto indexer = getIndexer(); auto specs = unittest::assertGet(indexer->init(std::vector<BSONObj>())); ASSERT_EQUALS(0U, specs.size()); - ASSERT_OK(indexer->insert({}, {}, nullptr)); + ASSERT_OK(indexer->insert({}, {})); indexer->abortWithoutCleanup(); ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest()); @@ -186,13 +186,13 @@ TEST_F(MultiIndexBlockTest, InsertingSingleDocumentFailsAfterAbort) { ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest()); ASSERT_EQUALS(ErrorCodes::IndexBuildAborted, - indexer->insert(BSON("_id" << 123 << "a" << 456), {}, nullptr)); + indexer->insert(BSON("_id" << 123 << "a" << 456), {})); ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest()); ASSERT_FALSE(indexer->isCommitted()); } -TEST_F(MultiIndexBlockTest, dumpInsertsFromBulkFailsAfterAbort) { +TEST_F(MultiIndexBlockTest, DumpInsertsFromBulkFailsAfterAbort) { auto indexer = getIndexer(); ASSERT_EQUALS(MultiIndexBlockImpl::State::kUninitialized, indexer->getState_forTest()); @@ -200,7 +200,7 @@ TEST_F(MultiIndexBlockTest, dumpInsertsFromBulkFailsAfterAbort) { ASSERT_EQUALS(0U, specs.size()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest()); - ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {}, nullptr)); + ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {})); ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest()); indexer->abort("test"_sd); @@ -220,7 +220,7 @@ TEST_F(MultiIndexBlockTest, CommitFailsAfterAbort) { ASSERT_EQUALS(0U, specs.size()); ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest()); - ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {}, nullptr)); + ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {})); ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest()); ASSERT_OK(indexer->dumpInsertsFromBulk()); diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript index 87f54e29f47..0f8e825cb90 100644 --- a/src/mongo/db/index/SConscript +++ b/src/mongo/db/index/SConscript @@ -26,26 +26,10 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/storage/index_entry_comparison', ], ) -env.CppUnitTest( - target='duplicate_key_tracker_test', - source=[ - 'duplicate_key_tracker_test.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/auth/authmocks', - '$BUILD_DIR/mongo/db/catalog/multi_index_block', - '$BUILD_DIR/mongo/db/catalog_raii', - '$BUILD_DIR/mongo/db/repl/replmocks', - '$BUILD_DIR/mongo/db/service_context_d_test_fixture', - '$BUILD_DIR/mongo/db/storage/ephemeral_for_test/storage_ephemeral_for_test_core', - 'duplicate_key_tracker', - ], -) env.Library( target='key_generator', source=[ @@ -154,6 +138,7 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + 'duplicate_key_tracker', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/multi_key_path_tracker', diff --git a/src/mongo/db/index/duplicate_key_tracker.cpp b/src/mongo/db/index/duplicate_key_tracker.cpp index 066ef969f05..7dd47ded1fe 100644 --- a/src/mongo/db/index/duplicate_key_tracker.cpp +++ b/src/mongo/db/index/duplicate_key_tracker.cpp @@ -34,12 +34,8 @@ #include "mongo/db/index/duplicate_key_tracker.h" #include "mongo/db/catalog/index_catalog_entry.h" -#include "mongo/db/catalog_raii.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/exec/working_set_common.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/keypattern.h" -#include "mongo/db/query/internal_plans.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -49,60 +45,63 @@ namespace { static constexpr StringData kKeyField = "key"_sd; } -DuplicateKeyTracker::DuplicateKeyTracker(const IndexCatalogEntry* entry, const NamespaceString& nss) - : _idCounter(0), _indexCatalogEntry(entry), _nss(nss) { +DuplicateKeyTracker::DuplicateKeyTracker(OperationContext* opCtx, const IndexCatalogEntry* entry) + : _indexCatalogEntry(entry), + _keyConstraintsTable( + opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx)) { invariant(_indexCatalogEntry->descriptor()->unique()); } -DuplicateKeyTracker::~DuplicateKeyTracker() {} - -NamespaceString DuplicateKeyTracker::makeTempNamespace() { - return NamespaceString("local.system.indexBuildConstraints-" + UUID::gen().toString()); -} - -Status DuplicateKeyTracker::recordDuplicates(OperationContext* opCtx, - Collection* tempCollection, - const std::vector<BSONObj>& keys) { - invariant(tempCollection->ns() == nss()); - invariant(opCtx->lockState()->inAWriteUnitOfWork()); - invariant(opCtx->lockState()->isCollectionLockedForMode(tempCollection->ns().ns(), MODE_IX)); +Status DuplicateKeyTracker::recordKeys(OperationContext* opCtx, const std::vector<BSONObj>& keys) { + if (keys.size() == 0) + return Status::OK(); + std::vector<BSONObj> toInsert; + toInsert.reserve(keys.size()); for (auto&& key : keys) { BSONObjBuilder builder; - builder.append("_id", _idCounter++); builder.append(kKeyField, key); BSONObj obj = builder.obj(); - LOG(2) << "Recording conflict for DuplicateKeyTracker: " << obj.toString(); - Status s = tempCollection->insertDocument(opCtx, InsertStatement(obj), nullptr, false); - if (!s.isOK()) - return s; + toInsert.emplace_back(std::move(obj)); } - return Status::OK(); -} -Status DuplicateKeyTracker::constraintsSatisfiedForIndex(OperationContext* opCtx, - Collection* tempCollection) const { - invariant(tempCollection->ns() == nss()); - invariant(opCtx->lockState()->isCollectionLockedForMode(tempCollection->ns().ns(), MODE_IS)); - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + std::vector<Record> records; + records.reserve(keys.size()); + for (auto&& obj : toInsert) { + records.emplace_back(Record{RecordId(), RecordData(obj.objdata(), obj.objsize())}); + } - auto collScan = InternalPlanner::collectionScan( - opCtx, tempCollection->ns().ns(), tempCollection, PlanExecutor::YieldPolicy::YIELD_AUTO); + LOG(2) << "recording " << records.size() << " duplicate key conflicts for unique index: " + << _indexCatalogEntry->descriptor()->indexName(); - BSONObj conflict; - PlanExecutor::ExecState state; - while (PlanExecutor::ExecState::ADVANCED == (state = collScan->getNext(&conflict, nullptr))) { + WriteUnitOfWork wuow(opCtx); + std::vector<Timestamp> timestamps(records.size()); + Status s = _keyConstraintsTable->rs()->insertRecords(opCtx, &records, timestamps); + if (!s.isOK()) + return s; - LOG(2) << "Resolving conflict for DuplicateKeyTracker: " << conflict.toString(); + wuow.commit(); + return Status::OK(); +} + +Status DuplicateKeyTracker::checkConstraints(OperationContext* opCtx) const { + auto constraintsCursor = _keyConstraintsTable->rs()->getCursor(opCtx); + auto record = constraintsCursor->next(); + + auto indexCursor = + _indexCatalogEntry->accessMethod()->getSortedDataInterface()->newCursor(opCtx); + + int count = 0; + while (record) { + count++; + BSONObj conflict = record->data.toBson(); BSONObj keyObj = conflict[kKeyField].Obj(); - auto cursor = - _indexCatalogEntry->accessMethod()->getSortedDataInterface()->newCursor(opCtx); - auto entry = cursor->seekExact(keyObj); + auto entry = indexCursor->seekExact(keyObj); // If there is not an exact match, there is no duplicate. if (!entry) { @@ -110,18 +109,19 @@ Status DuplicateKeyTracker::constraintsSatisfiedForIndex(OperationContext* opCtx } // If the following entry has the same key, this is a duplicate. - entry = cursor->next(); + entry = indexCursor->next(); if (entry && entry->key.woCompare(keyObj, BSONObj(), /*considerFieldNames*/ false) == 0) { return buildDupKeyErrorStatus(keyObj, _indexCatalogEntry->descriptor()->parentNS(), _indexCatalogEntry->descriptor()->indexName(), _indexCatalogEntry->descriptor()->keyPattern()); } - } - if (PlanExecutor::IS_EOF != state) { - return WorkingSetCommon::getMemberObjectStatus(conflict); + record = constraintsCursor->next(); } + + LOG(1) << "resolved " << count << " duplicate key conflicts for unique index: " + << _indexCatalogEntry->descriptor()->indexName(); return Status::OK(); } diff --git a/src/mongo/db/index/duplicate_key_tracker.h b/src/mongo/db/index/duplicate_key_tracker.h index f5ea6dbe584..8cbf6515145 100644 --- a/src/mongo/db/index/duplicate_key_tracker.h +++ b/src/mongo/db/index/duplicate_key_tracker.h @@ -34,56 +34,39 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/bsonobj.h" -#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/index_catalog_entry.h" #include "mongo/db/operation_context.h" +#include "mongo/db/storage/temporary_record_store.h" namespace mongo { /** * Records keys that have violated duplicate key constraints on unique indexes. The keys are backed - * by a temporary collection that the caller is responsible for creating, destroying, and holding - * locks while passing into mutating functions. + * by a temporary table that is created and destroyed by this tracker. */ class DuplicateKeyTracker { MONGO_DISALLOW_COPYING(DuplicateKeyTracker); public: - DuplicateKeyTracker(const IndexCatalogEntry* indexCatalogEntry, const NamespaceString& tempNss); - ~DuplicateKeyTracker(); + DuplicateKeyTracker(OperationContext* opCtx, const IndexCatalogEntry* indexCatalogEntry); /** - * Generates a unique namespace that should be used to construct the temporary backing - * Collection for this tracker. + * Given a set of duplicate keys, insert them into the key constraint table. */ - static NamespaceString makeTempNamespace(); - - /** - * Given a set of duplicate keys, insert them into tempCollection. - * - * The caller must hold locks for 'tempCollection' and be in a WriteUnitOfWork. - */ - Status recordDuplicates(OperationContext* opCtx, - Collection* tempCollection, - const std::vector<BSONObj>& keys); + Status recordKeys(OperationContext* opCtx, const std::vector<BSONObj>& keys); /** * Returns Status::OK if all previously recorded duplicate key constraint violations have been * resolved for the index. Returns a DuplicateKey error if there are still duplicate key * constraint violations on the index. - * - * The caller must hold locks for 'tempCollection'. */ - Status constraintsSatisfiedForIndex(OperationContext* opCtx, Collection* tempCollection) const; - - const NamespaceString& nss() const { - return _nss; - } + Status checkConstraints(OperationContext* opCtx) const; private: - std::int64_t _idCounter; - const IndexCatalogEntry* _indexCatalogEntry; - const NamespaceString _nss; + + // This temporary record store is owned by the duplicate key tracker and dropped along with it. + std::unique_ptr<TemporaryRecordStore> _keyConstraintsTable; }; } // namespace mongo diff --git a/src/mongo/db/index/duplicate_key_tracker_test.cpp b/src/mongo/db/index/duplicate_key_tracker_test.cpp deleted file mode 100644 index b1abac80d95..00000000000 --- a/src/mongo/db/index/duplicate_key_tracker_test.cpp +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/catalog/collection_catalog_entry.h" -#include "mongo/db/catalog/database_catalog_entry.h" -#include "mongo/db/catalog/multi_index_block_impl.h" -#include "mongo/db/catalog_raii.h" -#include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/curop.h" -#include "mongo/db/dbhelpers.h" -#include "mongo/db/index/duplicate_key_tracker.h" -#include "mongo/db/operation_context_noop.h" -#include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/service_context_d_test_fixture.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { - -static const StringData kConstraintNsPrefix = "local.system.indexBuildConstraints"_sd; - -class DuplicateKeyTrackerTest : public ServiceContextMongoDTest { -public: - DuplicateKeyTrackerTest() - : ServiceContextMongoDTest("ephemeralForTest"), _opCtx(cc().makeOperationContext()) { - repl::ReplicationCoordinator::set( - getServiceContext(), - stdx::make_unique<repl::ReplicationCoordinatorMock>(getServiceContext())); - } - - std::unique_ptr<DuplicateKeyTracker> makeTracker(OperationContext* opCtx, - const IndexCatalogEntry* entry) { - NamespaceString tempNss = DuplicateKeyTracker::makeTempNamespace(); - makeTempCollection(opCtx, tempNss); - - auto tracker = std::make_unique<DuplicateKeyTracker>(entry, tempNss); - - AutoGetCollection autoColl(opCtx, tracker->nss(), MODE_IS); - ASSERT(autoColl.getCollection()); - - return tracker; - }; - - OperationContext* opCtx() { - return _opCtx.get(); - } - - void tearDown() {} - - void makeTempCollection(OperationContext* opCtx, const NamespaceString& nss) { - WriteUnitOfWork wuow(opCtx); - - AutoGetOrCreateDb autoDb(opCtx, nss.db(), MODE_X); - auto db = autoDb.getDb(); - invariant(db); - - ASSERT(!db->getCollection(opCtx, nss)); - - CollectionOptions options; - options.setNoIdIndex(); - - // Create the temp collection - auto coll = db->createCollection(opCtx, nss.ns(), options); - invariant(coll); - wuow.commit(); - } - - void destroyTempCollection(OperationContext* opCtx, const NamespaceString& nss) { - WriteUnitOfWork wuow(opCtx); - - AutoGetDb autoDb(opCtx, nss.db(), MODE_X); - auto db = autoDb.getDb(); - invariant(db); - - ASSERT_OK(db->dropCollectionEvenIfSystem(opCtx, nss)); - wuow.commit(); - } - -private: - int _numIndexesCreated = 0; - ServiceContext::UniqueOperationContext _opCtx; -}; - -TEST_F(DuplicateKeyTrackerTest, IndexBuild) { - std::unique_ptr<DuplicateKeyTracker> tracker; - - const NamespaceString collNs("test.myCollection"); - const BSONObj doc1 = BSON("_id" << 1 << "a" << 1); - const BSONObj doc2 = BSON("_id" << 2 << "a" << 1); - - // Create the collection with a two documents that have the same key for 'a'. - { - AutoGetOrCreateDb dbRaii(opCtx(), collNs.db(), LockMode::MODE_X); - WriteUnitOfWork wunit(opCtx()); - CollectionOptions options; - options.uuid = UUID::gen(); - auto coll = dbRaii.getDb()->createCollection(opCtx(), collNs.ns(), options); - ASSERT(coll); - - ASSERT_OK(coll->insertDocument(opCtx(), InsertStatement(doc1), nullptr)); - ASSERT_OK(coll->insertDocument(opCtx(), InsertStatement(doc2), nullptr)); - wunit.commit(); - } - - const std::string indexName = "a_1"; - const BSONObj spec = - BSON("ns" << collNs.ns() << "v" << 2 << "name" << indexName << "key" << BSON("a" << 1) - << "unique" - << true - << "background" - << true); - - // Create the index build block. Insert two different documents, but each with the same value - // for 'a'. This will cause the index insert to allow inserting a duplicate key. - const IndexCatalogEntry* entry; - - boost::optional<Record> record1; - boost::optional<Record> record2; - { - AutoGetCollection autoColl(opCtx(), collNs, MODE_X); - auto coll = autoColl.getCollection(); - - MultiIndexBlockImpl indexer(opCtx(), coll); - // Don't use the bulk builder, which does not insert directly into the IAM for the index. - indexer.allowBackgroundBuilding(); - // Allow duplicates. - indexer.ignoreUniqueConstraint(); - - ASSERT_OK(indexer.init(spec).getStatus()); - - IndexDescriptor* desc = coll->getIndexCatalog()->findIndexByName( - opCtx(), indexName, true /* includeUnfinished */); - ASSERT(desc); - entry = coll->getIndexCatalog()->getEntry(desc); - - // Construct the tracker. - tracker = makeTracker(opCtx(), entry); - - // Index the documents. - WriteUnitOfWork wunit(opCtx()); - auto cursor = coll->getCursor(opCtx()); - - record1 = cursor->next(); - ASSERT(record1); - - std::vector<BSONObj> dupsInserted; - - // The insert of the first document should return no duplicates. - ASSERT_OK(indexer.insert(record1->data.releaseToBson(), record1->id, &dupsInserted)); - ASSERT_EQ(0u, dupsInserted.size()); - - // The insert of the second document should return that a duplicate key was inserted. - record2 = cursor->next(); - ASSERT(record2); - ASSERT_OK(indexer.insert(record2->data.releaseToBson(), record2->id, &dupsInserted)); - ASSERT_EQ(1u, dupsInserted.size()); - - // Record that duplicates were inserted. - AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IX); - ASSERT_OK(tracker->recordDuplicates(opCtx(), tempColl.getCollection(), dupsInserted)); - - ASSERT_OK(indexer.commit()); - wunit.commit(); - } - - // Confirm that the keys + RecordId of the duplicate are recorded. - { - AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IS); - Status s = tracker->constraintsSatisfiedForIndex(opCtx(), tempColl.getCollection()); - ASSERT_EQ(ErrorCodes::DuplicateKey, s.code()); - } - - // Now remove the document and index key to confirm the conflicts are resolved. - { - AutoGetCollection autoColl(opCtx(), collNs, MODE_IX); - auto coll = autoColl.getCollection(); - - WriteUnitOfWork wunit(opCtx()); - - OpDebug opDebug; - coll->deleteDocument(opCtx(), kUninitializedStmtId, record2->id, &opDebug); - wunit.commit(); - - // One key deleted for each index (includes _id) - ASSERT_EQ(2u, *opDebug.additiveMetrics.keysDeleted); - - AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IS); - Status s = tracker->constraintsSatisfiedForIndex(opCtx(), tempColl.getCollection()); - ASSERT_OK(s); - } - - destroyTempCollection(opCtx(), tracker->nss()); -} - -TEST_F(DuplicateKeyTrackerTest, BulkIndexBuild) { - std::unique_ptr<DuplicateKeyTracker> tracker; - - const NamespaceString collNs("test.myCollection"); - const BSONObj doc1 = BSON("_id" << 1 << "a" << 1); - const BSONObj doc2 = BSON("_id" << 2 << "a" << 1); - - // Create the collection with a two documents that have the same key for 'a'. - { - AutoGetOrCreateDb dbRaii(opCtx(), collNs.db(), LockMode::MODE_X); - WriteUnitOfWork wunit(opCtx()); - CollectionOptions options; - options.uuid = UUID::gen(); - auto coll = dbRaii.getDb()->createCollection(opCtx(), collNs.ns(), options); - ASSERT(coll); - - ASSERT_OK(coll->insertDocument(opCtx(), InsertStatement(doc1), nullptr)); - ASSERT_OK(coll->insertDocument(opCtx(), InsertStatement(doc2), nullptr)); - wunit.commit(); - } - - const std::string indexName = "a_1"; - const BSONObj spec = - BSON("ns" << collNs.ns() << "v" << 2 << "name" << indexName << "key" << BSON("a" << 1) - << "unique" - << true - << "background" - << true); - - // Create the bulk build block. Insert two different documents, but each with the same value - // for 'a'. This will cause the index insert to allow inserting a duplicate key. - const IndexCatalogEntry* entry; - - boost::optional<Record> record1; - boost::optional<Record> record2; - { - AutoGetCollection autoColl(opCtx(), collNs, MODE_X); - auto coll = autoColl.getCollection(); - - MultiIndexBlockImpl indexer(opCtx(), coll); - // Allow duplicates. - indexer.ignoreUniqueConstraint(); - - ASSERT_OK(indexer.init(spec).getStatus()); - - IndexDescriptor* desc = coll->getIndexCatalog()->findIndexByName( - opCtx(), indexName, true /* includeUnfinished */); - entry = coll->getIndexCatalog()->getEntry(desc); - - // Construct the tracker. - tracker = makeTracker(opCtx(), entry); - - auto cursor = coll->getCursor(opCtx()); - - record1 = cursor->next(); - ASSERT(record1); - - std::vector<BSONObj> dupsInserted; - - // Neither of these inserts will recognize duplicates because the bulk inserter does not - // detect them until dumpInsertsFromBulk() is called. - ASSERT_OK(indexer.insert(record1->data.releaseToBson(), record1->id, &dupsInserted)); - ASSERT_EQ(0u, dupsInserted.size()); - - record2 = cursor->next(); - ASSERT(record2); - ASSERT_OK(indexer.insert(record2->data.releaseToBson(), record2->id, &dupsInserted)); - ASSERT_EQ(0u, dupsInserted.size()); - - ASSERT_OK(indexer.dumpInsertsFromBulk(&dupsInserted)); - ASSERT_EQ(1u, dupsInserted.size()); - - // Record that duplicates were inserted. - WriteUnitOfWork wunit(opCtx()); - - AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IX); - ASSERT_OK(tracker->recordDuplicates(opCtx(), tempColl.getCollection(), dupsInserted)); - - ASSERT_OK(indexer.commit()); - wunit.commit(); - } - - // Confirm that the keys + RecordId of the duplicate are recorded. - { - AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IS); - Status s = tracker->constraintsSatisfiedForIndex(opCtx(), tempColl.getCollection()); - ASSERT_EQ(ErrorCodes::DuplicateKey, s.code()); - } - - // Now remove the document and index key to confirm the conflicts are resolved. - { - AutoGetCollection autoColl(opCtx(), collNs, MODE_IX); - auto coll = autoColl.getCollection(); - - WriteUnitOfWork wunit(opCtx()); - - OpDebug opDebug; - coll->deleteDocument(opCtx(), kUninitializedStmtId, record2->id, &opDebug); - wunit.commit(); - - // One key deleted for each index (includes _id) - ASSERT_EQ(2u, *opDebug.additiveMetrics.keysDeleted); - - AutoGetCollection tempColl(opCtx(), tracker->nss(), MODE_IS); - Status s = tracker->constraintsSatisfiedForIndex(opCtx(), tempColl.getCollection()); - ASSERT_OK(s); - } - - destroyTempCollection(opCtx(), tracker->nss()); -} -} // namespace -} // namespace mongo diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp index 6b74def0c44..b2975218c6e 100644 --- a/src/mongo/db/index/index_build_interceptor.cpp +++ b/src/mongo/db/index/index_build_interceptor.cpp @@ -47,13 +47,30 @@ namespace mongo { -IndexBuildInterceptor::IndexBuildInterceptor(OperationContext* opCtx) - : _sideWritesTable( - opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx)){}; +IndexBuildInterceptor::IndexBuildInterceptor(OperationContext* opCtx, IndexCatalogEntry* entry) + : _indexCatalogEntry(entry), + _sideWritesTable( + opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx)) { + + if (entry->descriptor()->unique()) { + _duplicateKeyTracker = std::make_unique<DuplicateKeyTracker>(opCtx, entry); + } +} + +Status IndexBuildInterceptor::recordDuplicateKeys(OperationContext* opCtx, + const std::vector<BSONObj>& keys) { + invariant(_indexCatalogEntry->descriptor()->unique()); + return _duplicateKeyTracker->recordKeys(opCtx, keys); +} + +Status IndexBuildInterceptor::checkDuplicateKeyConstraints(OperationContext* opCtx) const { + if (!_duplicateKeyTracker) { + return Status::OK(); + } + return _duplicateKeyTracker->checkConstraints(opCtx); +} Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, - IndexAccessMethod* indexAccessMethod, - const IndexDescriptor* indexDescriptor, const InsertDeleteOptions& options) { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); @@ -136,8 +153,8 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, // writes table. WriteUnitOfWork wuow(opCtx); for (auto& operation : batch) { - auto status = _applyWrite( - opCtx, indexAccessMethod, operation.second, options, &totalInserted, &totalDeleted); + auto status = + _applyWrite(opCtx, operation.second, options, &totalInserted, &totalDeleted); if (!status.isOK()) { return status; } @@ -159,15 +176,15 @@ Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx, progress->finished(); - log() << "index build for " << indexDescriptor->indexName() << ": drain applied " - << (_numApplied - appliedAtStart) << " side writes. i: " << totalInserted - << ", d: " << totalDeleted << ", total: " << _numApplied; + log() << "index build for " << _indexCatalogEntry->descriptor()->indexName() + << ": drain applied " << (_numApplied - appliedAtStart) + << " side writes. i: " << totalInserted << ", d: " << totalDeleted + << ", total: " << _numApplied; return Status::OK(); } Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, - IndexAccessMethod* indexAccessMethod, const BSONObj& operation, const InsertDeleteOptions& options, int64_t* const keysInserted, @@ -178,28 +195,34 @@ Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx, (strcmp(operation.getStringField("op"), "i") == 0) ? Op::kInsert : Op::kDelete; const BSONObjSet keySet = SimpleBSONObjComparator::kInstance.makeBSONObjSet({key}); + auto accessMethod = _indexCatalogEntry->accessMethod(); if (opType == Op::kInsert) { InsertResult result; - Status s = - indexAccessMethod->insertKeys(opCtx, - keySet, - SimpleBSONObjComparator::kInstance.makeBSONObjSet(), - MultikeyPaths{}, - opRecordId, - options, - &result); - if (!s.isOK()) { - return s; + auto status = accessMethod->insertKeys(opCtx, + keySet, + SimpleBSONObjComparator::kInstance.makeBSONObjSet(), + MultikeyPaths{}, + opRecordId, + options, + &result); + if (!status.isOK()) { + return status; + } + + if (result.dupsInserted.size()) { + status = recordDuplicateKeys(opCtx, result.dupsInserted); + if (!status.isOK()) { + return status; + } } - invariant(!result.dupsInserted.size()); *keysInserted += result.numInserted; } else { invariant(opType == Op::kDelete); DEV invariant(strcmp(operation.getStringField("op"), "d") == 0); int64_t numDeleted; - Status s = indexAccessMethod->removeKeys(opCtx, keySet, opRecordId, options, &numDeleted); + Status s = accessMethod->removeKeys(opCtx, keySet, opRecordId, options, &numDeleted); if (!s.isOK()) { return s; } @@ -298,7 +321,9 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, std::vector<Record> records; for (auto& doc : toInsert) { - records.emplace_back(Record{RecordId(), RecordData(doc.objdata(), doc.objsize())}); + records.emplace_back(Record{RecordId(), // The storage engine will assign its own RecordId + // when we pass one that is null. + RecordData(doc.objdata(), doc.objsize())}); } // By passing a vector of null timestamps, these inserts are not timestamped individually, but @@ -306,4 +331,5 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx, std::vector<Timestamp> timestamps(records.size()); return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps); } + } // namespace mongo diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h index b25fbf53c71..6988395906c 100644 --- a/src/mongo/db/index/index_build_interceptor.h +++ b/src/mongo/db/index/index_build_interceptor.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/index/duplicate_key_tracker.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/index/multikey_paths.h" #include "mongo/db/namespace_string.h" @@ -50,7 +51,7 @@ public: * intercept side writes. This interceptor must not exist longer than the operation context used * to construct it, as the underlying TemporaryRecordStore needs it to destroy itself. */ - IndexBuildInterceptor(OperationContext* opCtx); + IndexBuildInterceptor(OperationContext* opCtx, IndexCatalogEntry* entry); /** * Client writes that are concurrent with an index build will have their index updates written @@ -67,6 +68,19 @@ public: int64_t* const numKeysOut); /** + * Given a set of duplicate keys, record the keys for later verification by a call to + * checkConstraints(); + */ + Status recordDuplicateKeys(OperationContext* opCtx, const std::vector<BSONObj>& keys); + + /** + * Returns Status::OK if all previously recorded duplicate key constraint violations have been + * resolved for the index. Returns a DuplicateKey error if there are still duplicate key + * constraint violations on the index. + */ + Status checkDuplicateKeyConstraints(OperationContext* opCtx) const; + + /** * Performs a resumable scan on the side writes table, and either inserts or removes each key * from the underlying IndexAccessMethod. This will only insert as many records as are visible * in the current snapshot. @@ -74,10 +88,8 @@ public: * This is resumable, so subsequent calls will start the scan at the record immediately * following the last inserted record from a previous call to drainWritesIntoIndex. */ - Status drainWritesIntoIndex(OperationContext* opCtx, - IndexAccessMethod* indexAccessMethod, - const IndexDescriptor* indexDescriptor, - const InsertDeleteOptions& options); + Status drainWritesIntoIndex(OperationContext* opCtx, const InsertDeleteOptions& options); + /** * Returns 'true' if there are no visible records remaining to be applied from the side writes @@ -95,15 +107,19 @@ private: using SideWriteRecord = std::pair<RecordId, BSONObj>; Status _applyWrite(OperationContext* opCtx, - IndexAccessMethod* indexAccessMethod, const BSONObj& doc, const InsertDeleteOptions& options, int64_t* const keysInserted, int64_t* const keysDeleted); + // The entry for the index that is being built. + IndexCatalogEntry* _indexCatalogEntry; + // This temporary record store is owned by the interceptor and dropped along with it. std::unique_ptr<TemporaryRecordStore> _sideWritesTable; + std::unique_ptr<DuplicateKeyTracker> _duplicateKeyTracker; + int64_t _numApplied{0}; AtomicInt64 _sideWritesCounter{0}; diff --git a/src/mongo/dbtests/indexupdatetests.cpp b/src/mongo/dbtests/indexupdatetests.cpp index 9c44428ef6d..0244ff0122e 100644 --- a/src/mongo/dbtests/indexupdatetests.cpp +++ b/src/mongo/dbtests/indexupdatetests.cpp @@ -206,7 +206,17 @@ public: ASSERT_OK(indexer.init(spec).getStatus()); const Status status = indexer.insertAllDocumentsInCollection(); - ASSERT_EQUALS(status.code(), ErrorCodes::DuplicateKey); + if (!background) { + ASSERT_EQUALS(status.code(), ErrorCodes::DuplicateKey); + return; + } + + // Background builds do not detect duplicates until they commit. + ASSERT_OK(status); + + WriteUnitOfWork wunit(&_opCtx); + ASSERT_THROWS_CODE(indexer.commit(), AssertionException, ErrorCodes::DuplicateKey); + wunit.commit(); } }; |