summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLouis Williams <louis.williams@mongodb.com>2018-11-12 13:35:36 -0500
committerLouis Williams <louis.williams@mongodb.com>2018-11-29 13:09:15 -0500
commitca1cccb8a18be76c584f587e04b14512e59d8424 (patch)
tree455c08f9ad231f45fc64e8a3b2a5ec2d4048cc38
parentb5308fc30a1ec7405ccec6dcc4213cf5fb167a4e (diff)
downloadmongo-ca1cccb8a18be76c584f587e04b14512e59d8424.tar.gz
SERVER-38027 SERVER-37268 Partially enable hybrid index builds for background, non-unique indexes. Change background index builds to use the bulk builder and external sorter
-rw-r--r--etc/evergreen.yml6
-rw-r--r--jstests/noPassthrough/hybrid_index_with_updates.js125
-rw-r--r--src/mongo/db/catalog/SConscript3
-rw-r--r--src/mongo/db/catalog/collection_compact.cpp4
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp50
-rw-r--r--src/mongo/db/catalog/index_build_block.cpp22
-rw-r--r--src/mongo/db/catalog/index_catalog.h13
-rw-r--r--src/mongo/db/catalog/index_catalog_impl.cpp71
-rw-r--r--src/mongo/db/catalog/index_catalog_impl.h9
-rw-r--r--src/mongo/db/catalog/index_catalog_noop.h9
-rw-r--r--src/mongo/db/catalog/multi_index_block.h17
-rw-r--r--src/mongo/db/catalog/multi_index_block_impl.cpp77
-rw-r--r--src/mongo/db/catalog/multi_index_block_impl.h14
-rw-r--r--src/mongo/db/catalog/multi_index_block_test.cpp10
-rw-r--r--src/mongo/db/catalog/rename_collection.cpp2
-rw-r--r--src/mongo/db/commands/create_indexes.cpp67
-rw-r--r--src/mongo/db/concurrency/d_concurrency.h4
-rw-r--r--src/mongo/db/index/SConscript1
-rw-r--r--src/mongo/db/index/duplicate_key_tracker_test.cpp4
-rw-r--r--src/mongo/db/index/index_access_method.cpp30
-rw-r--r--src/mongo/db/index/index_access_method.h32
-rw-r--r--src/mongo/db/index/index_build_interceptor.cpp243
-rw-r--r--src/mongo/db/index/index_build_interceptor.h61
-rw-r--r--src/mongo/db/index_builder.cpp65
-rw-r--r--src/mongo/db/repair_database.cpp2
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp6
-rw-r--r--src/mongo/db/storage/kv/SConscript5
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.cpp10
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.h4
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine_test.cpp22
-rw-r--r--src/mongo/db/storage/kv/temporary_kv_record_store.cpp45
-rw-r--r--src/mongo/db/storage/kv/temporary_kv_record_store.h66
-rw-r--r--src/mongo/db/storage/record_store.h4
-rw-r--r--src/mongo/db/storage/storage_engine.h12
-rw-r--r--src/mongo/db/storage/temporary_record_store.h66
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp16
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp10
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp28
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h5
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp24
42 files changed, 1044 insertions, 229 deletions
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 32053ef9dec..c6bc16abf71 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -14730,7 +14730,7 @@ buildvariants:
# mobile storage engine.
test_flags: >-
--storageEngine=mobile
- --excludeWithAnyTags=requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact
+ --excludeWithAnyTags=requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact,requires_document_locking
--excludeWithAnyTags=SERVER-32709,SERVER-32869
compile_flags: >-
-j$(grep -c ^processor /proc/cpuinfo)
@@ -14810,7 +14810,7 @@ buildvariants:
expansions:
test_flags: >-
--storageEngine=mobile
- --excludeWithAnyTags=requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact
+ --excludeWithAnyTags=requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact,requires_document_locking
--excludeWithAnyTags=SERVER-32709,SERVER-32869
compile_flags: >-
-j$(grep -c ^processor /proc/cpuinfo)
@@ -14889,7 +14889,7 @@ buildvariants:
# mobile storage engine.
test_flags: >-
--storageEngine=mobile
- --excludeWithAnyTags=requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact
+ --excludeWithAnyTags=requires_wiredtiger,requires_replication,requires_sharding,uses_transactions,requires_capped,requires_profiling,requires_compact,requires_document_locking
--excludeWithAnyTags=SERVER-32709,SERVER-32869
compile_env: DEVELOPER_DIR=/Applications/Xcode8.3.app
compile_flags: >-
diff --git a/jstests/noPassthrough/hybrid_index_with_updates.js b/jstests/noPassthrough/hybrid_index_with_updates.js
new file mode 100644
index 00000000000..39d7868d93b
--- /dev/null
+++ b/jstests/noPassthrough/hybrid_index_with_updates.js
@@ -0,0 +1,125 @@
+/**
+ * Tests that write operations are accepted and result in correct indexing behavior for each phase
+ * of hybrid index builds.
+ *
+ * @tags: [requires_document_locking]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/check_log.js");
+
+ let conn = MongoRunner.runMongod();
+ let testDB = conn.getDB('test');
+
+ let turnFailPointOn = function(failPointName, i) {
+ assert.commandWorked(testDB.adminCommand(
+ {configureFailPoint: failPointName, mode: "alwaysOn", data: {"i": i}}));
+ };
+
+ let turnFailPointOff = function(failPointName) {
+ assert.commandWorked(testDB.adminCommand({configureFailPoint: failPointName, mode: "off"}));
+ };
+
+ let totalDocs = 0;
+ let crudOpsForPhase = function(coll, phase) {
+ let bulk = coll.initializeUnorderedBulkOp();
+
+ // Create 1000 documents in a specific range for this phase.
+ for (let i = 0; i < 1000; i++) {
+ bulk.insert({i: (phase * 1000) + i});
+ }
+ totalDocs += 1000;
+
+ if (phase <= 0) {
+ assert.commandWorked(bulk.execute());
+ return;
+ }
+
+ // Update 50 documents.
+ // For example, if phase is 2, documents [100, 150) will be updated to [-100, -150).
+ let start = (phase - 1) * 100;
+ for (let j = start; j < (100 * phase) - 50; j++) {
+ bulk.find({i: j}).update({$set: {i: -j}});
+ }
+ // Delete 25 documents.
+ // Similarly, if phase is 2, documents [150, 200) will be removed.
+ for (let j = start + 50; j < 100 * phase; j++) {
+ bulk.find({i: j}).remove();
+ }
+ totalDocs -= 50;
+
+ assert.commandWorked(bulk.execute());
+ };
+
+ crudOpsForPhase(testDB.hybrid, 0);
+ assert.eq(totalDocs, testDB.hybrid.count());
+
+ // Hang the build after the first document.
+ let stopKey = 1;
+ turnFailPointOn("hangBeforeIndexBuildOf", stopKey);
+
+ // Start the background build.
+ let bgBuild = startParallelShell(function() {
+ assert.commandWorked(db.hybrid.createIndex({i: 1}, {background: true}));
+ }, conn.port);
+
+ checkLog.contains(conn, "Hanging before index build of i=" + stopKey);
+
+ // Phase 1: Collection scan and external sort
+ // Insert documents while doing the bulk build.
+ crudOpsForPhase(testDB.hybrid, 1);
+ assert.eq(totalDocs, testDB.hybrid.count());
+
+ // Enable pause after bulk dump into index.
+ turnFailPointOn("hangAfterIndexBuildDumpsInsertsFromBulk");
+
+ // Wait for the bulk insert to complete.
+ turnFailPointOff("hangBeforeIndexBuildOf");
+ checkLog.contains(conn, "Hanging after dumping inserts from bulk builder");
+
+ // Phase 2: First drain
+ // Do some updates, inserts and deletes after the bulk builder has finished.
+
+ // Enable pause after first drain.
+ turnFailPointOn("hangAfterIndexBuildFirstDrain");
+
+ crudOpsForPhase(testDB.hybrid, 2);
+ assert.eq(totalDocs, testDB.hybrid.count());
+
+ // Allow first drain to start.
+ turnFailPointOff("hangAfterIndexBuildDumpsInsertsFromBulk");
+
+ // Wait for first drain to finish.
+ checkLog.contains(conn, "Hanging after index build first drain");
+
+ // Phase 3: Second drain
+ // Enable pause after second drain.
+ turnFailPointOn("hangAfterIndexBuildSecondDrain");
+
+ // Add inserts that must be consumed in the second drain.
+ crudOpsForPhase(testDB.hybrid, 3);
+ assert.eq(totalDocs, testDB.hybrid.count());
+
+ // Allow second drain to start.
+ turnFailPointOff("hangAfterIndexBuildFirstDrain");
+
+ // Wait for second drain to finish.
+ checkLog.contains(conn, "Hanging after index build second drain");
+
+ // Phase 4: Final drain and commit.
+ // Add inserts that must be consumed in the final drain.
+ crudOpsForPhase(testDB.hybrid, 4);
+ assert.eq(totalDocs, testDB.hybrid.count());
+
+ // Allow final drain to start.
+ turnFailPointOff("hangAfterIndexBuildSecondDrain");
+
+ // Wait for build to complete.
+ bgBuild();
+
+ assert.eq(totalDocs, testDB.hybrid.count());
+ assert.commandWorked(testDB.hybrid.validate({full: true}));
+
+ MongoRunner.stopMongod(conn);
+})();
diff --git a/src/mongo/db/catalog/SConscript b/src/mongo/db/catalog/SConscript
index 2e3fb91c150..038ea188797 100644
--- a/src/mongo/db/catalog/SConscript
+++ b/src/mongo/db/catalog/SConscript
@@ -246,6 +246,9 @@ env.Library(
'$BUILD_DIR/mongo/util/fail_point',
'$BUILD_DIR/mongo/util/progress_meter',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/index/index_build_interceptor',
+ ]
)
env.CppUnitTest(
diff --git a/src/mongo/db/catalog/collection_compact.cpp b/src/mongo/db/catalog/collection_compact.cpp
index 96075e81a6c..7971f96c29a 100644
--- a/src/mongo/db/catalog/collection_compact.cpp
+++ b/src/mongo/db/catalog/collection_compact.cpp
@@ -137,14 +137,12 @@ StatusWith<CompactStats> compactCollection(OperationContext* opCtx,
if (!status.isOK())
return StatusWith<CompactStats>(status);
- // The MMAPv1 storage engine used to add documents to indexer through the
- // RecordStoreCompactAdaptor interface.
status = recordStore->compact(opCtx);
if (!status.isOK())
return StatusWith<CompactStats>(status);
log() << "starting index commits";
- status = indexer.doneInserting();
+ status = indexer.dumpInsertsFromBulk();
if (!status.isOK())
return StatusWith<CompactStats>(status);
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index ec0be702ade..8e6c517c4c0 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -656,60 +656,20 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx,
<< " != "
<< newDoc.objsize());
- // At the end of this step, we will have a map of UpdateTickets, one per index, which
- // represent the index updates needed to be done, based on the changes between oldDoc and
- // newDoc.
- OwnedPointerMap<IndexDescriptor*, UpdateTicket> updateTickets;
- if (indexesAffected) {
- std::unique_ptr<IndexCatalog::IndexIterator> ii =
- _indexCatalog->getIndexIterator(opCtx, true);
- while (ii->more()) {
- IndexCatalogEntry* entry = ii->next();
- IndexDescriptor* descriptor = entry->descriptor();
- IndexAccessMethod* iam = entry->accessMethod();
-
- InsertDeleteOptions options;
- _indexCatalog->prepareInsertDeleteOptions(opCtx, descriptor, &options);
- UpdateTicket* updateTicket = new UpdateTicket();
- updateTickets.mutableMap()[descriptor] = updateTicket;
- uassertStatusOK(iam->validateUpdate(opCtx,
- oldDoc.value(),
- newDoc,
- oldLocation,
- options,
- updateTicket,
- entry->getFilterExpression()));
- }
- }
-
args->preImageDoc = oldDoc.value().getOwned();
Status updateStatus =
_recordStore->updateRecord(opCtx, oldLocation, newDoc.objdata(), newDoc.objsize());
- // Update each index with each respective UpdateTicket.
if (indexesAffected) {
- int64_t keysInsertedTotal = 0;
- int64_t keysDeletedTotal = 0;
+ int64_t keysInserted, keysDeleted;
- std::unique_ptr<IndexCatalog::IndexIterator> ii =
- _indexCatalog->getIndexIterator(opCtx, true);
- while (ii->more()) {
- IndexCatalogEntry* entry = ii->next();
- IndexDescriptor* descriptor = entry->descriptor();
- IndexAccessMethod* iam = entry->accessMethod();
-
- int64_t keysInserted;
- int64_t keysDeleted;
- uassertStatusOK(iam->update(
- opCtx, *updateTickets.mutableMap()[descriptor], &keysInserted, &keysDeleted));
- keysInsertedTotal += keysInserted;
- keysDeletedTotal += keysDeleted;
- }
+ uassertStatusOK(_indexCatalog->updateRecord(
+ opCtx, args->preImageDoc.get(), newDoc, oldLocation, &keysInserted, &keysDeleted));
if (opDebug) {
- opDebug->additiveMetrics.incrementKeysInserted(keysInsertedTotal);
- opDebug->additiveMetrics.incrementKeysDeleted(keysDeletedTotal);
+ opDebug->additiveMetrics.incrementKeysInserted(keysInserted);
+ opDebug->additiveMetrics.incrementKeysDeleted(keysDeleted);
}
}
diff --git a/src/mongo/db/catalog/index_build_block.cpp b/src/mongo/db/catalog/index_build_block.cpp
index 016890de388..5f8ef4e21d3 100644
--- a/src/mongo/db/catalog/index_build_block.cpp
+++ b/src/mongo/db/catalog/index_build_block.cpp
@@ -90,9 +90,11 @@ Status IndexCatalogImpl::IndexBuildBlock::init() {
_entry = _catalog->_setupInMemoryStructures(
_opCtx, std::move(descriptor), initFromDisk, isReadyIndex);
- if (isBackgroundIndex) {
- _indexBuildInterceptor = stdx::make_unique<IndexBuildInterceptor>();
- _indexBuildInterceptor->ensureSideWritesCollectionExists(_opCtx);
+ // Hybrid indexes are only enabled for background, non-unique indexes.
+ // TODO: Remove when SERVER-38036 and SERVER-37270 are complete.
+ const bool useHybrid = isBackgroundIndex && !descriptorPtr->unique();
+ if (useHybrid) {
+ _indexBuildInterceptor = stdx::make_unique<IndexBuildInterceptor>(_opCtx);
_entry->setIndexBuildInterceptor(_indexBuildInterceptor.get());
_opCtx->recoveryUnit()->onCommit(
@@ -129,7 +131,6 @@ void IndexCatalogImpl::IndexBuildBlock::fail() {
if (_entry) {
invariant(_catalog->_dropIndex(_opCtx, _entry).isOK());
if (_indexBuildInterceptor) {
- _indexBuildInterceptor->removeSideWritesCollection(_opCtx);
_entry->setIndexBuildInterceptor(nullptr);
}
} else {
@@ -145,11 +146,14 @@ void IndexCatalogImpl::IndexBuildBlock::success() {
NamespaceString ns(_indexNamespace);
invariant(_opCtx->lockState()->isDbLockedForMode(ns.db(), MODE_X));
+ // An index build should never be completed with writes remaining in the interceptor.
+ invariant(!_indexBuildInterceptor || _indexBuildInterceptor->areAllWritesApplied(_opCtx));
+
+ LOG(2) << "marking index " << _indexName << " as ready in snapshot id "
+ << _opCtx->recoveryUnit()->getSnapshotId();
_collection->indexBuildSuccess(_opCtx, _entry);
OperationContext* opCtx = _opCtx;
- LOG(2) << "marking index " << _indexName << " as ready in snapshot id "
- << opCtx->recoveryUnit()->getSnapshotId();
_opCtx->recoveryUnit()->onCommit(
[ opCtx, entry = _entry, collection = _collection ](boost::optional<Timestamp> commitTime) {
// Note: this runs after the WUOW commits but before we release our X lock on the
@@ -168,11 +172,5 @@ void IndexCatalogImpl::IndexBuildBlock::success() {
// able to remove this when the catalog is versioned.
collection->setMinimumVisibleSnapshot(commitTime.get());
});
-
- _entry->setIsReady(true);
- if (_indexBuildInterceptor) {
- _indexBuildInterceptor->removeSideWritesCollection(_opCtx);
- _entry->setIndexBuildInterceptor(nullptr);
- }
}
} // namespace mongo
diff --git a/src/mongo/db/catalog/index_catalog.h b/src/mongo/db/catalog/index_catalog.h
index 386a344d65c..4e8f70d6d66 100644
--- a/src/mongo/db/catalog/index_catalog.h
+++ b/src/mongo/db/catalog/index_catalog.h
@@ -370,6 +370,19 @@ public:
int64_t* const keysInsertedOut) = 0;
/**
+ * Both 'keysInsertedOut' and 'keysDeletedOut' are required and will be set to the number of
+ * index keys inserted and deleted by this operation, respectively.
+ *
+ * This method may throw.
+ */
+ virtual Status updateRecord(OperationContext* const opCtx,
+ const BSONObj& oldDoc,
+ const BSONObj& newDoc,
+ const RecordId& recordId,
+ int64_t* const keysInsertedOut,
+ int64_t* const keysDeletedOut) = 0;
+
+ /**
* When 'keysDeletedOut' is not null, it will be set to the number of index keys removed by
* this operation.
*/
diff --git a/src/mongo/db/catalog/index_catalog_impl.cpp b/src/mongo/db/catalog/index_catalog_impl.cpp
index 9a4aa990ab0..5d0c9542dd7 100644
--- a/src/mongo/db/catalog/index_catalog_impl.cpp
+++ b/src/mongo/db/catalog/index_catalog_impl.cpp
@@ -1168,8 +1168,7 @@ Status IndexCatalogImpl::_indexFilteredRecords(OperationContext* opCtx,
}
Status status = Status::OK();
- const bool hybridBuildsEnabled = false;
- if (hybridBuildsEnabled && index->isBuilding()) {
+ if (index->isBuilding()) {
int64_t inserted;
status = index->indexBuildInterceptor()->sideWrite(opCtx,
index->accessMethod(),
@@ -1219,8 +1218,7 @@ Status IndexCatalogImpl::_unindexRecord(OperationContext* opCtx,
const RecordId& loc,
bool logIfError,
int64_t* keysDeletedOut) {
- const bool hybridBuildsEnabled = false;
- if (hybridBuildsEnabled && index->isBuilding()) {
+ if (index->isBuilding()) {
int64_t removed;
auto status = index->indexBuildInterceptor()->sideWrite(
opCtx, index->accessMethod(), &obj, loc, IndexBuildInterceptor::Op::kDelete, &removed);
@@ -1281,6 +1279,61 @@ Status IndexCatalogImpl::indexRecords(OperationContext* opCtx,
return Status::OK();
}
+Status IndexCatalogImpl::updateRecord(OperationContext* const opCtx,
+ const BSONObj& oldDoc,
+ const BSONObj& newDoc,
+ const RecordId& recordId,
+ int64_t* const keysInsertedOut,
+ int64_t* const keysDeletedOut) {
+ *keysInsertedOut = 0;
+ *keysDeletedOut = 0;
+
+ // Ready indexes go directly through the IndexAccessMethod.
+ for (IndexCatalogEntryContainer::const_iterator it = _readyIndexes.begin();
+ it != _readyIndexes.end();
+ ++it) {
+ IndexCatalogEntry* entry = it->get();
+
+ IndexDescriptor* descriptor = entry->descriptor();
+ IndexAccessMethod* iam = entry->accessMethod();
+
+ InsertDeleteOptions options;
+ prepareInsertDeleteOptions(opCtx, descriptor, &options);
+
+ UpdateTicket updateTicket;
+
+ auto status = iam->validateUpdate(
+ opCtx, oldDoc, newDoc, recordId, options, &updateTicket, entry->getFilterExpression());
+ if (!status.isOK())
+ return status;
+
+ int64_t keysInserted;
+ int64_t keysDeleted;
+ status = iam->update(opCtx, updateTicket, &keysInserted, &keysDeleted);
+ if (!status.isOK())
+ return status;
+
+ *keysInsertedOut += keysInserted;
+ *keysDeletedOut += keysDeleted;
+ }
+
+ // Building indexes go through the interceptor.
+ BsonRecord record{recordId, Timestamp(), &newDoc};
+ for (IndexCatalogEntryContainer::const_iterator it = _buildingIndexes.begin();
+ it != _buildingIndexes.end();
+ ++it) {
+ IndexCatalogEntry* entry = it->get();
+
+ bool logIfError = false;
+ invariant(_unindexRecord(opCtx, entry, oldDoc, recordId, logIfError, keysDeletedOut));
+
+ auto status = _indexRecords(opCtx, entry, {record}, keysInsertedOut);
+ if (!status.isOK())
+ return status;
+ }
+ return Status::OK();
+}
+
void IndexCatalogImpl::unindexRecord(OperationContext* opCtx,
const BSONObj& obj,
const RecordId& loc,
@@ -1359,10 +1412,18 @@ void IndexCatalogImpl::indexBuildSuccess(OperationContext* opCtx, IndexCatalogEn
auto releasedEntry = _buildingIndexes.release(index->descriptor());
invariant(releasedEntry.get() == index);
_readyIndexes.add(std::move(releasedEntry));
- opCtx->recoveryUnit()->onRollback([this, index]() {
+
+ auto interceptor = index->indexBuildInterceptor();
+ index->setIndexBuildInterceptor(nullptr);
+ index->setIsReady(true);
+
+ opCtx->recoveryUnit()->onRollback([this, index, interceptor]() {
auto releasedEntry = _readyIndexes.release(index->descriptor());
invariant(releasedEntry.get() == index);
_buildingIndexes.add(std::move(releasedEntry));
+
+ index->setIndexBuildInterceptor(interceptor);
+ index->setIsReady(false);
});
}
diff --git a/src/mongo/db/catalog/index_catalog_impl.h b/src/mongo/db/catalog/index_catalog_impl.h
index 7c6c415241f..121bcf69bc5 100644
--- a/src/mongo/db/catalog/index_catalog_impl.h
+++ b/src/mongo/db/catalog/index_catalog_impl.h
@@ -306,6 +306,15 @@ public:
int64_t* keysInsertedOut) override;
/**
+ * See IndexCatalog::updateRecord
+ */
+ Status updateRecord(OperationContext* const opCtx,
+ const BSONObj& oldDoc,
+ const BSONObj& newDoc,
+ const RecordId& recordId,
+ int64_t* const keysInsertedOut,
+ int64_t* const keysDeletedOut) override;
+ /**
* When 'keysDeletedOut' is not null, it will be set to the number of index keys removed by
* this operation.
*/
diff --git a/src/mongo/db/catalog/index_catalog_noop.h b/src/mongo/db/catalog/index_catalog_noop.h
index e645831063a..34ea94afee2 100644
--- a/src/mongo/db/catalog/index_catalog_noop.h
+++ b/src/mongo/db/catalog/index_catalog_noop.h
@@ -175,6 +175,15 @@ public:
return Status::OK();
}
+ Status updateRecord(OperationContext* const opCtx,
+ const BSONObj& oldDoc,
+ const BSONObj& newDoc,
+ const RecordId& recordId,
+ int64_t* const keysInsertedOut,
+ int64_t* const keysDeletedOut) override {
+ return Status::OK();
+ };
+
void unindexRecord(OperationContext* const opCtx,
const BSONObj& obj,
const RecordId& loc,
diff --git a/src/mongo/db/catalog/multi_index_block.h b/src/mongo/db/catalog/multi_index_block.h
index b2468a15c9b..70283a83fa5 100644
--- a/src/mongo/db/catalog/multi_index_block.h
+++ b/src/mongo/db/catalog/multi_index_block.h
@@ -151,9 +151,20 @@ public:
*
* Should not be called inside of a WriteUnitOfWork.
*/
- virtual Status doneInserting() = 0;
- virtual Status doneInserting(std::set<RecordId>* const dupRecords) = 0;
- virtual Status doneInserting(std::vector<BSONObj>* const dupKeysInserted) = 0;
+ virtual Status dumpInsertsFromBulk() = 0;
+ virtual Status dumpInsertsFromBulk(std::set<RecordId>* const dupRecords) = 0;
+ virtual Status dumpInsertsFromBulk(std::vector<BSONObj>* const dupKeysInserted) = 0;
+
+ /**
+ * For background indexes using an IndexBuildInterceptor to capture inserts during a build,
+ * drain these writes into the index. If intent locks are held on the collection, more writes
+ * may come in after this drain completes. To ensure that all writes are completely drained
+ * before calling commit(), stop writes on the collection by holding a S or X while calling this
+ * method.
+ *
+ * Must not be in a WriteUnitOfWork.
+ */
+ virtual Status drainBackgroundWritesIfNeeded() = 0;
/**
* Marks the index ready for use. Should only be called as the last method after
diff --git a/src/mongo/db/catalog/multi_index_block_impl.cpp b/src/mongo/db/catalog/multi_index_block_impl.cpp
index 28f90ae1249..5ce1badd0e4 100644
--- a/src/mongo/db/catalog/multi_index_block_impl.cpp
+++ b/src/mongo/db/catalog/multi_index_block_impl.cpp
@@ -276,7 +276,10 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlockImpl::init(const std::vector<BSO
if (!status.isOK())
return status;
- if (!_buildInBackground) {
+ // Foreground builds and background builds using an interceptor can use the bulk builder.
+ const bool useBulk =
+ !_buildInBackground || index.block->getEntry()->indexBuildInterceptor();
+ if (useBulk) {
// Bulk build process requires foreground building as it assumes nothing is changing
// under it.
index.bulk = index.real->initiateBulk(eachIndexBuildMaxMemoryUsageBytes);
@@ -287,6 +290,7 @@ StatusWith<std::vector<BSONObj>> MultiIndexBlockImpl::init(const std::vector<BSO
_collection->getIndexCatalog()->prepareInsertDeleteOptions(
_opCtx, descriptor, &index.options);
index.options.dupsAllowed = index.options.dupsAllowed || _ignoreUnique;
+ index.options.fromIndexBuilder = true;
if (_ignoreUnique) {
index.options.getKeysMode = IndexAccessMethod::GetKeysMode::kRelaxConstraints;
}
@@ -487,11 +491,12 @@ Status MultiIndexBlockImpl::insertAllDocumentsInCollection() {
progress->finished();
- Status ret = doneInserting();
+ Status ret = dumpInsertsFromBulk();
if (!ret.isOK())
return ret;
- log() << "build index done. scanned " << n << " total records. " << t.seconds() << " secs";
+ log() << "build index collection scan done. scanned " << n << " total records. " << t.seconds()
+ << " secs";
return Status::OK();
}
@@ -534,19 +539,20 @@ Status MultiIndexBlockImpl::insert(const BSONObj& doc,
return Status::OK();
}
-Status MultiIndexBlockImpl::doneInserting() {
- return _doneInserting(nullptr, nullptr);
+Status MultiIndexBlockImpl::dumpInsertsFromBulk() {
+ return _dumpInsertsFromBulk(nullptr, nullptr);
}
-Status MultiIndexBlockImpl::doneInserting(std::set<RecordId>* dupRecords) {
- return _doneInserting(dupRecords, nullptr);
+Status MultiIndexBlockImpl::dumpInsertsFromBulk(std::set<RecordId>* dupRecords) {
+ return _dumpInsertsFromBulk(dupRecords, nullptr);
}
-Status MultiIndexBlockImpl::doneInserting(std::vector<BSONObj>* dupKeysInserted) {
- return _doneInserting(nullptr, dupKeysInserted);
+Status MultiIndexBlockImpl::dumpInsertsFromBulk(std::vector<BSONObj>* dupKeysInserted) {
+ return _dumpInsertsFromBulk(nullptr, dupKeysInserted);
}
-Status MultiIndexBlockImpl::_doneInserting(std::set<RecordId>* dupRecords,
- std::vector<BSONObj>* dupKeysInserted) {
+
+Status MultiIndexBlockImpl::_dumpInsertsFromBulk(std::set<RecordId>* dupRecords,
+ std::vector<BSONObj>* dupKeysInserted) {
if (State::kAborted == _getState()) {
return {ErrorCodes::IndexBuildAborted,
str::stream() << "Index build aborted: " << _abortReason
@@ -561,7 +567,7 @@ Status MultiIndexBlockImpl::_doneInserting(std::set<RecordId>* dupRecords,
for (size_t i = 0; i < _indexes.size(); i++) {
if (_indexes[i].bulk == NULL)
continue;
- LOG(1) << "\t bulk commit starting for index: "
+ LOG(1) << "\t dumping from external sorter into index: "
<< _indexes[i].block->getEntry()->descriptor()->indexName();
Status status = _indexes[i].real->commitBulk(_opCtx,
_indexes[i].bulk.get(),
@@ -579,6 +585,43 @@ Status MultiIndexBlockImpl::_doneInserting(std::set<RecordId>* dupRecords,
return Status::OK();
}
+Status MultiIndexBlockImpl::drainBackgroundWritesIfNeeded() {
+ if (State::kAborted == _getState()) {
+ return {ErrorCodes::IndexBuildAborted,
+ str::stream() << "Index build aborted: " << _abortReason
+ << ". Cannot complete drain phase: "
+ << _collection->ns().ns()
+ << "("
+ << *_collection->uuid()
+ << ")"};
+ }
+
+ invariant(!_opCtx->lockState()->inAWriteUnitOfWork());
+
+ // Drain side-writes table for each index. This only drains what is visible. Assuming intent
+ // locks are held on the user collection, more writes can come in after this drain completes.
+ // Callers are responsible for stopping writes by holding an S or X lock while draining before
+ // completing the index build.
+ for (size_t i = 0; i < _indexes.size(); i++) {
+ auto interceptor = _indexes[i].block->getEntry()->indexBuildInterceptor();
+ if (!interceptor)
+ continue;
+
+ LOG(1) << "draining background writes on collection " << _collection->ns()
+ << " into index: " << _indexes[i].block->getEntry()->descriptor()->indexName();
+
+ auto status = interceptor->drainWritesIntoIndex(_opCtx,
+ _indexes[i].real,
+ _indexes[i].block->getEntry()->descriptor(),
+ _indexes[i].options);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+ return Status::OK();
+}
+
+
void MultiIndexBlockImpl::abortWithoutCleanup() {
_setStateToAbortedIfNotCommitted("aborted without cleanup"_sd);
_indexes.clear();
@@ -613,6 +656,16 @@ Status MultiIndexBlockImpl::commit(stdx::function<void(const BSONObj& spec)> onC
onCreateFn(_indexes[i].block->getSpec());
}
+ // Do this before calling success(), which unsets the interceptor pointer on the index
+ // catalog entry.
+ auto interceptor = _indexes[i].block->getEntry()->indexBuildInterceptor();
+ if (interceptor) {
+ auto multikeyPaths = interceptor->getMultikeyPaths();
+ if (multikeyPaths) {
+ _indexes[i].block->getEntry()->setMultikey(_opCtx, multikeyPaths.get());
+ }
+ }
+
_indexes[i].block->success();
// The bulk builder will track multikey information itself. Non-bulk builders re-use the
diff --git a/src/mongo/db/catalog/multi_index_block_impl.h b/src/mongo/db/catalog/multi_index_block_impl.h
index ab008f9692c..c842298289c 100644
--- a/src/mongo/db/catalog/multi_index_block_impl.h
+++ b/src/mongo/db/catalog/multi_index_block_impl.h
@@ -82,9 +82,14 @@ public:
const RecordId& loc,
std::vector<BSONObj>* const dupKeysInserted = nullptr) override;
- Status doneInserting() override;
- Status doneInserting(std::set<RecordId>* dupRecords) override;
- Status doneInserting(std::vector<BSONObj>* dupKeysInserted) override;
+ Status dumpInsertsFromBulk() override;
+ Status dumpInsertsFromBulk(std::set<RecordId>* dupRecords) override;
+ Status dumpInsertsFromBulk(std::vector<BSONObj>* dupKeysInserted) override;
+
+ /**
+ * See MultiIndexBlock::drainBackgroundWritesIfNeeded()
+ */
+ Status drainBackgroundWritesIfNeeded() override;
Status commit() override;
Status commit(stdx::function<void(const BSONObj& spec)> onCreateFn) override;
@@ -134,7 +139,8 @@ private:
InsertDeleteOptions options;
};
- Status _doneInserting(std::set<RecordId>* dupRecords, std::vector<BSONObj>* dupKeysInserted);
+ Status _dumpInsertsFromBulk(std::set<RecordId>* dupRecords,
+ std::vector<BSONObj>* dupKeysInserted);
/**
* Returns the current state.
diff --git a/src/mongo/db/catalog/multi_index_block_test.cpp b/src/mongo/db/catalog/multi_index_block_test.cpp
index 11dcd74502c..562bdd518fb 100644
--- a/src/mongo/db/catalog/multi_index_block_test.cpp
+++ b/src/mongo/db/catalog/multi_index_block_test.cpp
@@ -112,7 +112,7 @@ TEST_F(MultiIndexBlockTest, CommitWithoutInsertingDocuments) {
ASSERT_EQUALS(0U, specs.size());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest());
- ASSERT_OK(indexer->doneInserting());
+ ASSERT_OK(indexer->dumpInsertsFromBulk());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kPreCommit, indexer->getState_forTest());
ASSERT_FALSE(indexer->isCommitted());
@@ -134,7 +134,7 @@ TEST_F(MultiIndexBlockTest, CommitAfterInsertingSingleDocument) {
ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest());
ASSERT_OK(indexer->insert({}, {}, nullptr));
- ASSERT_OK(indexer->doneInserting());
+ ASSERT_OK(indexer->dumpInsertsFromBulk());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kPreCommit, indexer->getState_forTest());
ASSERT_FALSE(indexer->isCommitted());
@@ -192,7 +192,7 @@ TEST_F(MultiIndexBlockTest, InsertingSingleDocumentFailsAfterAbort) {
ASSERT_FALSE(indexer->isCommitted());
}
-TEST_F(MultiIndexBlockTest, DoneInsertingFailsAfterAbort) {
+TEST_F(MultiIndexBlockTest, dumpInsertsFromBulkFailsAfterAbort) {
auto indexer = getIndexer();
ASSERT_EQUALS(MultiIndexBlockImpl::State::kUninitialized, indexer->getState_forTest());
@@ -206,7 +206,7 @@ TEST_F(MultiIndexBlockTest, DoneInsertingFailsAfterAbort) {
indexer->abort("test"_sd);
ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest());
- ASSERT_EQUALS(ErrorCodes::IndexBuildAborted, indexer->doneInserting());
+ ASSERT_EQUALS(ErrorCodes::IndexBuildAborted, indexer->dumpInsertsFromBulk());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kAborted, indexer->getState_forTest());
ASSERT_FALSE(indexer->isCommitted());
@@ -223,7 +223,7 @@ TEST_F(MultiIndexBlockTest, CommitFailsAfterAbort) {
ASSERT_OK(indexer->insert(BSON("_id" << 123 << "a" << 456), {}, nullptr));
ASSERT_EQUALS(MultiIndexBlockImpl::State::kRunning, indexer->getState_forTest());
- ASSERT_OK(indexer->doneInserting());
+ ASSERT_OK(indexer->dumpInsertsFromBulk());
ASSERT_EQUALS(MultiIndexBlockImpl::State::kPreCommit, indexer->getState_forTest());
indexer->abort("test"_sd);
diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp
index c3e71007b5e..c6c8c128e4c 100644
--- a/src/mongo/db/catalog/rename_collection.cpp
+++ b/src/mongo/db/catalog/rename_collection.cpp
@@ -462,7 +462,7 @@ Status renameCollectionCommon(OperationContext* opCtx,
return status;
}
- status = indexer.doneInserting();
+ status = indexer.dumpInsertsFromBulk();
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp
index 15add881ec2..ea461d2a5c3 100644
--- a/src/mongo/db/commands/create_indexes.cpp
+++ b/src/mongo/db/commands/create_indexes.cpp
@@ -28,6 +28,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kIndex
+
#include "mongo/platform/basic.h"
#include <string>
@@ -60,6 +62,8 @@
#include "mongo/db/server_options.h"
#include "mongo/db/views/view_catalog.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -68,6 +72,10 @@ using std::string;
using IndexVersion = IndexDescriptor::IndexVersion;
+MONGO_FAIL_POINT_DEFINE(hangAfterIndexBuildFirstDrain);
+MONGO_FAIL_POINT_DEFINE(hangAfterIndexBuildSecondDrain);
+MONGO_FAIL_POINT_DEFINE(hangAfterIndexBuildDumpsInsertsFromBulk);
+
namespace {
const StringData kIndexesFieldName = "indexes"_sd;
@@ -389,11 +397,7 @@ public:
dbLock.relockWithMode(MODE_IX);
}
- try {
- Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_IX);
- uassertStatusOK(indexer.insertAllDocumentsInCollection());
- } catch (const DBException& e) {
- invariant(e.code() != ErrorCodes::WriteConflict);
+ auto relockOnErrorGuard = MakeGuard([&] {
// Must have exclusive DB lock before we clean up the index build via the
// destructor of 'indexer'.
if (indexer.getBuildInBackground()) {
@@ -406,8 +410,50 @@ public:
std::terminate();
}
}
- throw;
+ });
+
+ // Collection scan and insert into index, followed by a drain of writes received in the
+ // background.
+ {
+ Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_IX);
+ uassertStatusOK(indexer.insertAllDocumentsInCollection());
+ }
+
+ if (MONGO_FAIL_POINT(hangAfterIndexBuildDumpsInsertsFromBulk)) {
+ log() << "Hanging after dumping inserts from bulk builder";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterIndexBuildDumpsInsertsFromBulk);
+ }
+
+ // Perform the first drain while holding an intent lock.
+ {
+ opCtx->recoveryUnit()->abandonSnapshot();
+ Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_IS);
+
+ LOG(1) << "performing first index build drain";
+ uassertStatusOK(indexer.drainBackgroundWritesIfNeeded());
+ }
+
+ if (MONGO_FAIL_POINT(hangAfterIndexBuildFirstDrain)) {
+ log() << "Hanging after index build first drain";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterIndexBuildFirstDrain);
+ }
+
+ // Perform the second drain while stopping writes on the collection.
+ {
+ opCtx->recoveryUnit()->abandonSnapshot();
+ Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_S);
+
+ LOG(1) << "performing second index build drain";
+ uassertStatusOK(indexer.drainBackgroundWritesIfNeeded());
+ }
+
+ if (MONGO_FAIL_POINT(hangAfterIndexBuildSecondDrain)) {
+ log() << "Hanging after index build second drain";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterIndexBuildSecondDrain);
}
+
+ relockOnErrorGuard.Dismiss();
+
// Need to return db lock back to exclusive, to complete the index build.
if (indexer.getBuildInBackground()) {
opCtx->recoveryUnit()->abandonSnapshot();
@@ -418,10 +464,15 @@ public:
DatabaseShardingState::get(db).checkDbVersion(opCtx);
}
- uassert(28551, "database dropped during index build", db);
- uassert(28552, "collection dropped during index build", db->getCollection(opCtx, ns));
+ invariant(db);
+ invariant(db->getCollection(opCtx, ns));
}
+ // Perform the third and final drain after releasing a shared lock and reacquiring an
+ // exclusive lock on the database.
+ LOG(1) << "performing final index build drain";
+ uassertStatusOK(indexer.drainBackgroundWritesIfNeeded());
+
writeConflictRetry(opCtx, kCommandName, ns.ns(), [&] {
WriteUnitOfWork wunit(opCtx);
diff --git a/src/mongo/db/concurrency/d_concurrency.h b/src/mongo/db/concurrency/d_concurrency.h
index 4ba068ac445..7f6f52e9c99 100644
--- a/src/mongo/db/concurrency/d_concurrency.h
+++ b/src/mongo/db/concurrency/d_concurrency.h
@@ -319,6 +319,10 @@ public:
return _result == LOCK_OK;
}
+ LockMode mode() const {
+ return _mode;
+ }
+
private:
const ResourceId _id;
OperationContext* const _opCtx;
diff --git a/src/mongo/db/index/SConscript b/src/mongo/db/index/SConscript
index 1f37291f34b..87f54e29f47 100644
--- a/src/mongo/db/index/SConscript
+++ b/src/mongo/db/index/SConscript
@@ -157,7 +157,6 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/multi_key_path_tracker',
- '$BUILD_DIR/mongo/db/s/sharding_api_d',
'index_access_methods',
],
)
diff --git a/src/mongo/db/index/duplicate_key_tracker_test.cpp b/src/mongo/db/index/duplicate_key_tracker_test.cpp
index abb373e94e0..b1abac80d95 100644
--- a/src/mongo/db/index/duplicate_key_tracker_test.cpp
+++ b/src/mongo/db/index/duplicate_key_tracker_test.cpp
@@ -280,7 +280,7 @@ TEST_F(DuplicateKeyTrackerTest, BulkIndexBuild) {
std::vector<BSONObj> dupsInserted;
// Neither of these inserts will recognize duplicates because the bulk inserter does not
- // detect them until doneInserting() is called.
+ // detect them until dumpInsertsFromBulk() is called.
ASSERT_OK(indexer.insert(record1->data.releaseToBson(), record1->id, &dupsInserted));
ASSERT_EQ(0u, dupsInserted.size());
@@ -289,7 +289,7 @@ TEST_F(DuplicateKeyTrackerTest, BulkIndexBuild) {
ASSERT_OK(indexer.insert(record2->data.releaseToBson(), record2->id, &dupsInserted));
ASSERT_EQ(0u, dupsInserted.size());
- ASSERT_OK(indexer.doneInserting(&dupsInserted));
+ ASSERT_OK(indexer.dumpInsertsFromBulk(&dupsInserted));
ASSERT_EQ(1u, dupsInserted.size());
// Record that duplicates were inserted.
diff --git a/src/mongo/db/index/index_access_method.cpp b/src/mongo/db/index/index_access_method.cpp
index 86a936c3bc6..250223530a3 100644
--- a/src/mongo/db/index/index_access_method.cpp
+++ b/src/mongo/db/index/index_access_method.cpp
@@ -184,13 +184,27 @@ Status AbstractIndexAccessMethod::insert(OperationContext* opCtx,
const RecordId& loc,
const InsertDeleteOptions& options,
InsertResult* result) {
- bool checkIndexKeySize = shouldCheckIndexKeySize(opCtx);
+ invariant(options.fromIndexBuilder || !_btreeState->isBuilding());
+
BSONObjSet multikeyMetadataKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
MultikeyPaths multikeyPaths;
+
// Delegate to the subclass.
getKeys(obj, options.getKeysMode, &keys, &multikeyMetadataKeys, &multikeyPaths);
+ return insertKeys(opCtx, keys, multikeyMetadataKeys, multikeyPaths, loc, options, result);
+}
+
+Status AbstractIndexAccessMethod::insertKeys(OperationContext* opCtx,
+ const BSONObjSet& keys,
+ const BSONObjSet& multikeyMetadataKeys,
+ const MultikeyPaths& multikeyPaths,
+ const RecordId& loc,
+ const InsertDeleteOptions& options,
+ InsertResult* result) {
+ bool checkIndexKeySize = shouldCheckIndexKeySize(opCtx);
+
// Add all new data keys, and all new multikey metadata keys, into the index. When iterating
// over the data keys, each of them should point to the doc's RecordId. When iterating over
// the multikey metadata keys, they should point to the reserved 'kMultikeyMetadataKeyId'.
@@ -236,7 +250,6 @@ Status AbstractIndexAccessMethod::insert(OperationContext* opCtx,
if (shouldMarkIndexAsMultikey(keys, multikeyMetadataKeys, multikeyPaths)) {
_btreeState->setMultikey(opCtx, multikeyPaths);
}
-
return Status::OK();
}
@@ -271,7 +284,9 @@ Status AbstractIndexAccessMethod::remove(OperationContext* opCtx,
const RecordId& loc,
const InsertDeleteOptions& options,
int64_t* numDeleted) {
+ invariant(!_btreeState->isBuilding());
invariant(numDeleted);
+
*numDeleted = 0;
BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
// There's no need to compute the prefixes of the indexed fields that cause the index to be
@@ -285,12 +300,20 @@ Status AbstractIndexAccessMethod::remove(OperationContext* opCtx,
getKeys(
obj, GetKeysMode::kRelaxConstraintsUnfiltered, &keys, multikeyMetadataKeys, multikeyPaths);
+ return removeKeys(opCtx, keys, loc, options, numDeleted);
+}
+
+Status AbstractIndexAccessMethod::removeKeys(OperationContext* opCtx,
+ const BSONObjSet& keys,
+ const RecordId& loc,
+ const InsertDeleteOptions& options,
+ int64_t* numDeleted) {
+
for (const auto& key : keys) {
removeOneKey(opCtx, key, loc, options.dupsAllowed);
}
*numDeleted = keys.size();
-
return Status::OK();
}
@@ -446,6 +469,7 @@ Status AbstractIndexAccessMethod::update(OperationContext* opCtx,
const UpdateTicket& ticket,
int64_t* numInserted,
int64_t* numDeleted) {
+ invariant(!_btreeState->isBuilding());
invariant(ticket.newKeys.size() ==
ticket.oldKeys.size() + ticket.added.size() - ticket.removed.size());
invariant(numInserted);
diff --git a/src/mongo/db/index/index_access_method.h b/src/mongo/db/index/index_access_method.h
index 54f2e351d26..3c466486410 100644
--- a/src/mongo/db/index/index_access_method.h
+++ b/src/mongo/db/index/index_access_method.h
@@ -93,6 +93,14 @@ public:
const InsertDeleteOptions& options,
InsertResult* result) = 0;
+ virtual Status insertKeys(OperationContext* opCtx,
+ const BSONObjSet& keys,
+ const BSONObjSet& multikeyMetadataKeys,
+ const MultikeyPaths& multikeyPaths,
+ const RecordId& loc,
+ const InsertDeleteOptions& options,
+ InsertResult* result) = 0;
+
/**
* Analogous to above, but remove the records instead of inserting them.
* 'numDeleted' will be set to the number of keys removed from the index for the document.
@@ -103,6 +111,12 @@ public:
const InsertDeleteOptions& options,
int64_t* numDeleted) = 0;
+ virtual Status removeKeys(OperationContext* opCtx,
+ const BSONObjSet& keys,
+ const RecordId& loc,
+ const InsertDeleteOptions& options,
+ int64_t* numDeleted) = 0;
+
/**
* Checks whether the index entries for the document 'from', which is placed at location
* 'loc' on disk, can be changed to the index entries for the doc 'to'. Provides a ticket
@@ -403,6 +417,10 @@ struct InsertDeleteOptions {
// Are duplicate keys allowed in the index?
bool dupsAllowed = false;
+ // Only an index builder is allowed to insert into the index while it is building, so only the
+ // index builder should set this to 'true'.
+ bool fromIndexBuilder = false;
+
// Should we relax the index constraints?
IndexAccessMethod::GetKeysMode getKeysMode =
IndexAccessMethod::GetKeysMode::kEnforceConstraints;
@@ -439,12 +457,26 @@ public:
const InsertDeleteOptions& options,
InsertResult* result) final;
+ Status insertKeys(OperationContext* opCtx,
+ const BSONObjSet& keys,
+ const BSONObjSet& multikeyMetadataKeys,
+ const MultikeyPaths& multikeyPaths,
+ const RecordId& loc,
+ const InsertDeleteOptions& options,
+ InsertResult* result) final;
+
Status remove(OperationContext* opCtx,
const BSONObj& obj,
const RecordId& loc,
const InsertDeleteOptions& options,
int64_t* numDeleted) final;
+ Status removeKeys(OperationContext* opCtx,
+ const BSONObjSet& keys,
+ const RecordId& loc,
+ const InsertDeleteOptions& options,
+ int64_t* numDeleted) final;
+
Status validateUpdate(OperationContext* opCtx,
const BSONObj& from,
const BSONObj& to,
diff --git a/src/mongo/db/index/index_build_interceptor.cpp b/src/mongo/db/index/index_build_interceptor.cpp
index 6af48b542c1..c82baee0b0c 100644
--- a/src/mongo/db/index/index_build_interceptor.cpp
+++ b/src/mongo/db/index/index_build_interceptor.cpp
@@ -35,47 +35,195 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/multi_key_path_tracker.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/service_context.h"
#include "mongo/util/log.h"
+#include "mongo/util/progress_meter.h"
#include "mongo/util/uuid.h"
namespace mongo {
-namespace {
-const bool makeCollections = false;
-}
+IndexBuildInterceptor::IndexBuildInterceptor(OperationContext* opCtx)
+ : _sideWritesTable(
+ opCtx->getServiceContext()->getStorageEngine()->makeTemporaryRecordStore(opCtx)){};
-NamespaceString IndexBuildInterceptor::makeTempSideWritesNs() {
- return NamespaceString("local.system.sideWrites-" + UUID::gen().toString());
-}
+Status IndexBuildInterceptor::drainWritesIntoIndex(OperationContext* opCtx,
+ IndexAccessMethod* indexAccessMethod,
+ const IndexDescriptor* indexDescriptor,
+ const InsertDeleteOptions& options) {
+ invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+
+ // These are used for logging only.
+ int64_t totalDeleted = 0;
+ int64_t totalInserted = 0;
+
+ const int64_t appliedAtStart = _numApplied;
+
+ // Set up the progress meter. This will never be completely accurate, because more writes can be
+ // read from the side writes table than are observed before draining.
+ static const char* curopMessage = "Index build draining writes";
+ stdx::unique_lock<Client> lk(*opCtx->getClient());
+ ProgressMeterHolder progress(CurOp::get(opCtx)->setMessage_inlock(
+ curopMessage, curopMessage, _sideWritesCounter.load() - appliedAtStart, 1));
+ lk.unlock();
+
+ // Buffer operations into batches to insert per WriteUnitOfWork. Impose an upper limit on the
+ // number of documents and the total size of the batch.
+ const int32_t kBatchMaxSize = 1000;
+ const int64_t kBatchMaxBytes = BSONObjMaxInternalSize;
+
+ int64_t batchSizeBytes = 0;
+
+ std::vector<SideWriteRecord> batch;
+ batch.reserve(kBatchMaxSize);
+
+ // Hold on to documents that would exceed the per-batch memory limit. Always insert this first
+ // into the next batch.
+ boost::optional<SideWriteRecord> stashed;
+
+ auto cursor = _sideWritesTable->rs()->getCursor(opCtx);
+
+ bool atEof = false;
+ while (!atEof) {
+
+ // Stashed records should be inserted into a batch first.
+ if (stashed) {
+ invariant(batch.empty());
+ batch.push_back(std::move(stashed.get()));
+ stashed.reset();
+ }
+
+ auto record = cursor->next();
+
+ if (record) {
+ RecordId currentRecordId = record->id;
+ BSONObj docOut = record->data.toBson().getOwned();
+
+ // If the total batch size in bytes would be too large, stash this document and let the
+ // current batch insert.
+ int objSize = docOut.objsize();
+ if (batchSizeBytes + objSize > kBatchMaxBytes) {
+ invariant(!stashed);
+
+ // Stash this document to be inserted in the next batch.
+ stashed.emplace(currentRecordId, std::move(docOut));
+ } else {
+ batchSizeBytes += objSize;
+ batch.emplace_back(currentRecordId, std::move(docOut));
+
+ // Continue if there is more room in the batch.
+ if (batch.size() < kBatchMaxSize) {
+ continue;
+ }
+ }
+ } else {
+ atEof = true;
+ if (batch.empty())
+ break;
+ }
+
+ // Account for more writes coming in after the drain starts.
+ progress->setTotalWhileRunning(_sideWritesCounter.loadRelaxed() - appliedAtStart);
+
+ invariant(!batch.empty());
+
+ // If we are here, either we have reached the end of the table or the batch is full, so
+ // insert everything in one WriteUnitOfWork, and delete each inserted document from the side
+ // writes table.
+ WriteUnitOfWork wuow(opCtx);
+ for (auto& operation : batch) {
+ auto status = _applyWrite(
+ opCtx, indexAccessMethod, operation.second, options, &totalInserted, &totalDeleted);
+ if (!status.isOK()) {
+ return status;
+ }
-void IndexBuildInterceptor::ensureSideWritesCollectionExists(OperationContext* opCtx) {
- if (!makeCollections) {
- return;
+ // Delete the document from the table as soon as it has been inserted into the index.
+ // This ensures that no key is ever inserted twice and no keys are skipped.
+ _sideWritesTable->rs()->deleteRecord(opCtx, operation.first);
+ }
+ cursor->save();
+ wuow.commit();
+
+ cursor->restore();
+
+ progress->hit(batch.size());
+ _numApplied += batch.size();
+ batch.clear();
+ batchSizeBytes = 0;
}
- // TODO SERVER-38027 Consider pushing this higher into the createIndexes command logic.
- OperationShardingState::get(opCtx).setAllowImplicitCollectionCreation(BSONElement());
+ progress->finished();
- AutoGetOrCreateDb local(opCtx, "local", LockMode::MODE_X);
- CollectionOptions options;
- options.setNoIdIndex();
- options.temp = true;
+ log() << "index build for " << indexDescriptor->indexName() << ": drain applied "
+ << (_numApplied - appliedAtStart) << " side writes. i: " << totalInserted
+ << ", d: " << totalDeleted << ", total: " << _numApplied;
- local.getDb()->createCollection(opCtx, _sideWritesNs.ns(), options);
+ return Status::OK();
}
-void IndexBuildInterceptor::removeSideWritesCollection(OperationContext* opCtx) {
- if (!makeCollections) {
- return;
+Status IndexBuildInterceptor::_applyWrite(OperationContext* opCtx,
+ IndexAccessMethod* indexAccessMethod,
+ const BSONObj& operation,
+ const InsertDeleteOptions& options,
+ int64_t* const keysInserted,
+ int64_t* const keysDeleted) {
+ const BSONObj key = operation["key"].Obj();
+ const RecordId opRecordId = RecordId(operation["recordId"].Long());
+ const Op opType =
+ (strcmp(operation.getStringField("op"), "i") == 0) ? Op::kInsert : Op::kDelete;
+ const BSONObjSet keySet = SimpleBSONObjComparator::kInstance.makeBSONObjSet({key});
+
+ if (opType == Op::kInsert) {
+ InsertResult result;
+ Status s =
+ indexAccessMethod->insertKeys(opCtx,
+ keySet,
+ SimpleBSONObjComparator::kInstance.makeBSONObjSet(),
+ MultikeyPaths{},
+ opRecordId,
+ options,
+ &result);
+ if (!s.isOK()) {
+ return s;
+ }
+
+ invariant(!result.dupsInserted.size());
+ *keysInserted += result.numInserted;
+ } else {
+ invariant(opType == Op::kDelete);
+ DEV invariant(strcmp(operation.getStringField("op"), "d") == 0);
+
+ int64_t numDeleted;
+ Status s = indexAccessMethod->removeKeys(opCtx, keySet, opRecordId, options, &numDeleted);
+ if (!s.isOK()) {
+ return s;
+ }
+
+ *keysDeleted += numDeleted;
}
+ return Status::OK();
+}
- AutoGetDb local(opCtx, "local", LockMode::MODE_X);
- fassert(50994, local.getDb()->dropCollectionEvenIfSystem(opCtx, _sideWritesNs, repl::OpTime()));
+bool IndexBuildInterceptor::areAllWritesApplied(OperationContext* opCtx) const {
+ invariant(_sideWritesTable);
+ auto cursor = _sideWritesTable->rs()->getCursor(opCtx, false /* forward */);
+ auto record = cursor->next();
+
+ // The table is empty only when all writes are applied.
+ if (!record)
+ return true;
+
+ return false;
+}
+
+boost::optional<MultikeyPaths> IndexBuildInterceptor::getMultikeyPaths() const {
+ stdx::unique_lock<stdx::mutex> lk(_multikeyPathMutex);
+ return _multikeyPaths;
}
Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
@@ -83,7 +231,9 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
const BSONObj* obj,
RecordId loc,
Op op,
- int64_t* numKeysOut) {
+ int64_t* const numKeysOut) {
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
*numKeysOut = 0;
BSONObjSet keys = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
BSONObjSet multikeyMetadataKeys = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
@@ -98,23 +248,24 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
// `multikeyMetadataKeys` when inserting.
*numKeysOut = keys.size() + (op == Op::kInsert ? multikeyMetadataKeys.size() : 0);
- if (_multikeyPaths) {
- MultikeyPathTracker::mergeMultikeyPaths(&_multikeyPaths.get(), multikeyPaths);
- } else {
- // `mergeMultikeyPaths` is sensitive to the two inputs having the same multikey
- // "shape". Initialize `_multikeyPaths` with the right shape from the first result.
- _multikeyPaths = multikeyPaths;
+ if (*numKeysOut == 0) {
+ return Status::OK();
}
- AutoGetCollection coll(opCtx, _sideWritesNs, LockMode::MODE_IX);
- invariant(coll.getCollection());
+ {
+ stdx::unique_lock<stdx::mutex> lk(_multikeyPathMutex);
+ if (_multikeyPaths) {
+ MultikeyPathTracker::mergeMultikeyPaths(&_multikeyPaths.get(), multikeyPaths);
+ } else {
+ // `mergeMultikeyPaths` is sensitive to the two inputs having the same multikey
+ // "shape". Initialize `_multikeyPaths` with the right shape from the first result.
+ _multikeyPaths = multikeyPaths;
+ }
+ }
- std::vector<InsertStatement> toInsert;
+ std::vector<BSONObj> toInsert;
for (const auto& key : keys) {
- // Documents inserted into this table must be consumed in insert-order. Today, we can rely
- // on storage engines to return documents in insert-order, but with clustered indexes,
- // that may no longer be true.
- //
+ // Documents inserted into this table must be consumed in insert-order.
// Additionally, these writes should be timestamped with the same timestamps that the
// other writes making up this operation are given. When index builds can cope with
// replication rollbacks, side table writes associated with a CUD operation should
@@ -138,9 +289,21 @@ Status IndexBuildInterceptor::sideWrite(OperationContext* opCtx,
}
}
- OpDebug* const opDebug = nullptr;
- const bool fromMigrate = false;
- return coll.getCollection()->insertDocuments(
- opCtx, toInsert.begin(), toInsert.end(), opDebug, fromMigrate);
+ _sideWritesCounter.fetchAndAdd(toInsert.size());
+ // This insert may roll back, but not necessarily from inserting into this table. If other write
+ // operations outside this table and in the same transaction are rolled back, this counter also
+ // needs to be rolled back.
+ opCtx->recoveryUnit()->onRollback(
+ [ this, size = toInsert.size() ] { _sideWritesCounter.fetchAndSubtract(size); });
+
+ std::vector<Record> records;
+ for (auto& obj : toInsert) {
+ records.emplace_back(Record{RecordId(), RecordData(obj.objdata(), obj.objsize())});
+ }
+
+ // By passing a vector of null timestamps, these inserts are not timestamped individually, but
+ // rather with the timestamp of the owning operation.
+ std::vector<Timestamp> timestamps(records.size());
+ return _sideWritesTable->rs()->insertRecords(opCtx, &records, timestamps);
}
} // namespace mongo
diff --git a/src/mongo/db/index/index_build_interceptor.h b/src/mongo/db/index/index_build_interceptor.h
index 13ae79d10d2..b25fbf53c71 100644
--- a/src/mongo/db/index/index_build_interceptor.h
+++ b/src/mongo/db/index/index_build_interceptor.h
@@ -29,9 +29,10 @@
#pragma once
+#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/multikey_paths.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/record_id.h"
+#include "mongo/db/storage/temporary_record_store.h"
#include "mongo/platform/atomic_word.h"
namespace mongo {
@@ -44,13 +45,12 @@ class IndexBuildInterceptor {
public:
enum class Op { kInsert, kDelete };
- IndexBuildInterceptor() : _sideWritesNs(makeTempSideWritesNs()) {}
- IndexBuildInterceptor(NamespaceString sideWritesNs) : _sideWritesNs(sideWritesNs) {}
-
- static NamespaceString makeTempSideWritesNs();
-
- void ensureSideWritesCollectionExists(OperationContext* opCtx);
- void removeSideWritesCollection(OperationContext* opCtx);
+ /**
+ * The OperationContext is used to construct a temporary table in the storage engine to
+ * intercept side writes. This interceptor must not exist longer than the operation context used
+ * to construct it, as the underlying TemporaryRecordStore needs it to destroy itself.
+ */
+ IndexBuildInterceptor(OperationContext* opCtx);
/**
* Client writes that are concurrent with an index build will have their index updates written
@@ -64,10 +64,51 @@ public:
const BSONObj* obj,
RecordId loc,
Op op,
- int64_t* numKeysOut);
+ int64_t* const numKeysOut);
+
+ /**
+ * Performs a resumable scan on the side writes table, and either inserts or removes each key
+ * from the underlying IndexAccessMethod. This will only insert as many records as are visible
+ * in the current snapshot.
+ *
+ * This is resumable, so subsequent calls will start the scan at the record immediately
+ * following the last inserted record from a previous call to drainWritesIntoIndex.
+ */
+ Status drainWritesIntoIndex(OperationContext* opCtx,
+ IndexAccessMethod* indexAccessMethod,
+ const IndexDescriptor* indexDescriptor,
+ const InsertDeleteOptions& options);
+
+ /**
+ * Returns 'true' if there are no visible records remaining to be applied from the side writes
+ * table. Ensure that this returns 'true' when an index build is completed.
+ */
+ bool areAllWritesApplied(OperationContext* opCtx) const;
+
+ /**
+ * When an index builder wants to commit, use this to retrieve any recorded multikey paths
+ * that were tracked during the build.
+ */
+ boost::optional<MultikeyPaths> getMultikeyPaths() const;
private:
- NamespaceString _sideWritesNs;
+ using SideWriteRecord = std::pair<RecordId, BSONObj>;
+
+ Status _applyWrite(OperationContext* opCtx,
+ IndexAccessMethod* indexAccessMethod,
+ const BSONObj& doc,
+ const InsertDeleteOptions& options,
+ int64_t* const keysInserted,
+ int64_t* const keysDeleted);
+
+ // This temporary record store is owned by the interceptor and dropped along with it.
+ std::unique_ptr<TemporaryRecordStore> _sideWritesTable;
+
+ int64_t _numApplied{0};
+
+ AtomicInt64 _sideWritesCounter{0};
+
+ mutable stdx::mutex _multikeyPathMutex;
boost::optional<MultikeyPaths> _multikeyPaths;
};
diff --git a/src/mongo/db/index_builder.cpp b/src/mongo/db/index_builder.cpp
index c4977673c37..93691e3139f 100644
--- a/src/mongo/db/index_builder.cpp
+++ b/src/mongo/db/index_builder.cpp
@@ -170,25 +170,30 @@ void IndexBuilder::waitForBgIndexStarting() {
}
namespace {
-/**
- * @param status shalt not be of code `WriteConflict`.
- */
-Status _failIndexBuild(MultiIndexBlock& indexer, Status status, bool allowBackgroundBuilding) {
+Status _failIndexBuild(OperationContext* opCtx,
+ MultiIndexBlock& indexer,
+ Lock::DBLock* dbLock,
+ Status status,
+ bool allowBackgroundBuilding) {
invariant(status.code() != ErrorCodes::WriteConflict);
+ if (!allowBackgroundBuilding) {
+ return status;
+ }
+
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ if (dbLock->mode() != MODE_X) {
+ dbLock->relockWithMode(MODE_X);
+ }
+
if (status.code() == ErrorCodes::InterruptedAtShutdown) {
// leave it as-if kill -9 happened. This will be handled on restart.
- invariant(allowBackgroundBuilding); // Foreground builds aren't interrupted.
indexer.abortWithoutCleanup();
return status;
}
- if (allowBackgroundBuilding) {
- error() << "Background index build failed. Status: " << redact(status);
- fassertFailed(50769);
- } else {
- return status;
- }
+ error() << "Background index build failed. Status: " << redact(status);
+ fassertFailed(50769);
}
} // namespace
@@ -242,12 +247,13 @@ Status IndexBuilder::_build(OperationContext* opCtx,
return Status::OK();
}
if (!status.isOK()) {
- return _failIndexBuild(indexer, status, allowBackgroundBuilding);
+ return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding);
}
if (allowBackgroundBuilding) {
_setBgIndexStarting();
invariant(dbLock);
+ opCtx->recoveryUnit()->abandonSnapshot();
dbLock->relockWithMode(MODE_IX);
}
@@ -255,21 +261,38 @@ Status IndexBuilder::_build(OperationContext* opCtx,
Lock::CollectionLock collLock(opCtx->lockState(), ns.ns(), MODE_IX);
// WriteConflict exceptions and statuses are not expected to escape this method.
status = indexer.insertAllDocumentsInCollection();
+ if (!status.isOK()) {
+ return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding);
+ }
+
+ // Perform the first drain while holding an intent lock.
+ status = indexer.drainBackgroundWritesIfNeeded();
}
if (!status.isOK()) {
- if (allowBackgroundBuilding) {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- dbLock->relockWithMode(MODE_X);
- if (status == ErrorCodes::InterruptedAtShutdown)
- return _failIndexBuild(indexer, status, allowBackgroundBuilding);
- opCtx->checkForInterrupt();
- }
- return _failIndexBuild(indexer, status, allowBackgroundBuilding);
+ return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding);
+ }
+
+ // Perform the second drain while stopping inserts into the collection.
+ {
+ Lock::CollectionLock colLock(opCtx->lockState(), ns.ns(), MODE_S);
+ status = indexer.drainBackgroundWritesIfNeeded();
+ }
+ if (!status.isOK()) {
+ return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding);
}
if (allowBackgroundBuilding) {
+ opCtx->recoveryUnit()->abandonSnapshot();
dbLock->relockWithMode(MODE_X);
}
+
+ // Perform the third and final drain after releasing a shared lock and reacquiring an
+ // exclusive lock on the database.
+ status = indexer.drainBackgroundWritesIfNeeded();
+ if (!status.isOK()) {
+ return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding);
+ }
+
status = writeConflictRetry(opCtx, "Commit index build", ns.ns(), [this, opCtx, &indexer, &ns] {
WriteUnitOfWork wunit(opCtx);
auto status = indexer.commit();
@@ -301,7 +324,7 @@ Status IndexBuilder::_build(OperationContext* opCtx,
return Status::OK();
});
if (!status.isOK()) {
- return _failIndexBuild(indexer, status, allowBackgroundBuilding);
+ return _failIndexBuild(opCtx, indexer, dbLock, status, allowBackgroundBuilding);
}
if (allowBackgroundBuilding) {
diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp
index da970fb3a8b..6a0ededa2c0 100644
--- a/src/mongo/db/repair_database.cpp
+++ b/src/mongo/db/repair_database.cpp
@@ -213,7 +213,7 @@ Status rebuildIndexesOnCollection(OperationContext* opCtx,
}
}
- Status status = indexer->doneInserting();
+ Status status = indexer->dumpInsertsFromBulk();
if (!status.isOK())
return status;
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index 29c1a7ea0e5..4dd31ed31a9 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -165,7 +165,7 @@ Status CollectionBulkLoaderImpl::commit() {
// deleted.
if (_secondaryIndexesBlock) {
std::set<RecordId> secDups;
- auto status = _secondaryIndexesBlock->doneInserting(&secDups);
+ auto status = _secondaryIndexesBlock->dumpInsertsFromBulk(&secDups);
if (!status.isOK()) {
return status;
}
@@ -193,8 +193,8 @@ Status CollectionBulkLoaderImpl::commit() {
if (_idIndexBlock) {
// Delete dups.
std::set<RecordId> dups;
- // Do not do inside a WriteUnitOfWork (required by doneInserting).
- auto status = _idIndexBlock->doneInserting(&dups);
+ // Do not do inside a WriteUnitOfWork (required by dumpInsertsFromBulk).
+ auto status = _idIndexBlock->dumpInsertsFromBulk(&dups);
if (!status.isOK()) {
return status;
}
diff --git a/src/mongo/db/storage/kv/SConscript b/src/mongo/db/storage/kv/SConscript
index 9400873d7cf..f9a6650e90d 100644
--- a/src/mongo/db/storage/kv/SConscript
+++ b/src/mongo/db/storage/kv/SConscript
@@ -45,7 +45,10 @@ env.Library(
# Should not be referenced outside this SConscript file.
env.Library(
target='kv_storage_engine',
- source=['kv_storage_engine.cpp'],
+ source=[
+ 'kv_storage_engine.cpp',
+ 'temporary_kv_record_store.cpp'
+ ],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/catalog/catalog_impl',
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp
index 7dc534c2942..2ae61716062 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/storage/kv/kv_catalog_feature_tracker.h"
#include "mongo/db/storage/kv/kv_database_catalog_entry.h"
#include "mongo/db/storage/kv/kv_engine.h"
+#include "mongo/db/storage/kv/temporary_kv_record_store.h"
#include "mongo/db/storage/storage_repair_observer.h"
#include "mongo/db/unclean_shutdown.h"
#include "mongo/util/assert_util.h"
@@ -627,8 +628,12 @@ Status KVStorageEngine::repairRecordStore(OperationContext* opCtx, const std::st
return Status::OK();
}
-std::unique_ptr<RecordStore> KVStorageEngine::makeTemporaryRecordStore(OperationContext* opCtx) {
- return _engine->makeTemporaryRecordStore(opCtx, _catalog->newTempIdent());
+std::unique_ptr<TemporaryRecordStore> KVStorageEngine::makeTemporaryRecordStore(
+ OperationContext* opCtx) {
+ std::unique_ptr<RecordStore> rs =
+ _engine->makeTemporaryRecordStore(opCtx, _catalog->newTempIdent());
+ LOG(1) << "created temporary record store: " << rs->getIdent();
+ return std::make_unique<TemporaryKVRecordStore>(opCtx, getEngine(), std::move(rs));
}
void KVStorageEngine::setJournalListener(JournalListener* jl) {
@@ -734,4 +739,5 @@ void KVStorageEngine::_dumpCatalog(OperationContext* opCtx) {
opCtx->recoveryUnit()->abandonSnapshot();
}
+
} // namespace mongo
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h
index b148f29b105..eac5e5a1896 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.h
+++ b/src/mongo/db/storage/kv/kv_storage_engine.h
@@ -42,6 +42,7 @@
#include "mongo/db/storage/kv/kv_database_catalog_entry_base.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/storage_engine.h"
+#include "mongo/db/storage/temporary_record_store.h"
#include "mongo/stdx/functional.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
@@ -122,7 +123,8 @@ public:
virtual Status repairRecordStore(OperationContext* opCtx, const std::string& ns);
- virtual std::unique_ptr<RecordStore> makeTemporaryRecordStore(OperationContext* opCtx) override;
+ virtual std::unique_ptr<TemporaryRecordStore> makeTemporaryRecordStore(
+ OperationContext* opCtx) override;
virtual void cleanShutdown();
diff --git a/src/mongo/db/storage/kv/kv_storage_engine_test.cpp b/src/mongo/db/storage/kv/kv_storage_engine_test.cpp
index 68c7098e689..bd58e5adc0a 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine_test.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine_test.cpp
@@ -76,7 +76,7 @@ public:
return _storageEngine->getCatalog()->getCollectionIdent(ns.ns());
}
- std::unique_ptr<RecordStore> makeTemporary(OperationContext* opCtx) {
+ std::unique_ptr<TemporaryRecordStore> makeTemporary(OperationContext* opCtx) {
return _storageEngine->makeTemporaryRecordStore(opCtx);
}
@@ -230,8 +230,8 @@ TEST_F(KVStorageEngineTest, ReconcileDropsTemporary) {
auto opCtx = cc().makeOperationContext();
auto rs = makeTemporary(opCtx.get());
- ASSERT(rs);
- const std::string ident = rs->getIdent();
+ ASSERT(rs.get());
+ const std::string ident = rs->rs()->getIdent();
ASSERT(identExists(opCtx.get(), ident));
@@ -241,6 +241,22 @@ TEST_F(KVStorageEngineTest, ReconcileDropsTemporary) {
ASSERT(!identExists(opCtx.get(), ident));
}
+TEST_F(KVStorageEngineTest, TemporaryDropsItself) {
+ auto opCtx = cc().makeOperationContext();
+
+ std::string ident;
+ {
+ auto rs = makeTemporary(opCtx.get());
+ ASSERT(rs.get());
+ ident = rs->rs()->getIdent();
+
+ ASSERT(identExists(opCtx.get(), ident));
+ }
+
+ // The temporary record store RAII class should drop itself.
+ ASSERT(!identExists(opCtx.get(), ident));
+}
+
TEST_F(KVStorageEngineRepairTest, LoadCatalogRecoversOrphans) {
auto opCtx = cc().makeOperationContext();
diff --git a/src/mongo/db/storage/kv/temporary_kv_record_store.cpp b/src/mongo/db/storage/kv/temporary_kv_record_store.cpp
new file mode 100644
index 00000000000..3f2a9d47fb7
--- /dev/null
+++ b/src/mongo/db/storage/kv/temporary_kv_record_store.cpp
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/db/storage/kv/temporary_kv_record_store.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+TemporaryKVRecordStore::~TemporaryKVRecordStore() {
+ auto status = _kvEngine->dropIdent(_opCtx, _rs->getIdent());
+ invariant(status.isOK(),
+ str::stream() << "failed to drop temporary ident: " << _rs->getIdent()
+ << " with status: "
+ << status);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/kv/temporary_kv_record_store.h b/src/mongo/db/storage/kv/temporary_kv_record_store.h
new file mode 100644
index 00000000000..8fa63e65555
--- /dev/null
+++ b/src/mongo/db/storage/kv/temporary_kv_record_store.h
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/storage/temporary_record_store.h"
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/kv/kv_engine.h"
+#include "mongo/db/storage/record_store.h"
+
+namespace mongo {
+
+/**
+ * This is an implementation of an RAII type that manages a temporary RecordStore on a KVEngine.
+ *
+ * This object should not exist any longer than the provided OperationContext, as the destructor
+ * uses it to drop the record store on the KVEngine.
+ */
+class TemporaryKVRecordStore : public TemporaryRecordStore {
+public:
+ TemporaryKVRecordStore(OperationContext* opCtx,
+ KVEngine* kvEngine,
+ std::unique_ptr<RecordStore> rs)
+ : TemporaryRecordStore(std::move(rs)), _opCtx(opCtx), _kvEngine(kvEngine){};
+
+ // Move constructor.
+ TemporaryKVRecordStore(TemporaryKVRecordStore&& other) noexcept
+ : TemporaryRecordStore(std::move(other._rs)),
+ _opCtx(other._opCtx),
+ _kvEngine(other._kvEngine) {}
+
+ ~TemporaryKVRecordStore();
+
+private:
+ OperationContext* _opCtx;
+ KVEngine* _kvEngine;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index a6dc2901ff1..e2d659e840a 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -267,6 +267,10 @@ public:
_ns = ns.ns();
}
+ bool isTemp() const {
+ return ns().size() == 0;
+ }
+
virtual const std::string& getIdent() const = 0;
/**
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index 456972b0997..d8926a68f50 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -45,7 +45,7 @@ namespace mongo {
class DatabaseCatalogEntry;
class JournalListener;
class OperationContext;
-class RecordStore;
+class TemporaryRecordStore;
class RecoveryUnit;
class SnapshotManager;
struct StorageGlobalParams;
@@ -288,11 +288,13 @@ public:
virtual Status repairRecordStore(OperationContext* opCtx, const std::string& ns) = 0;
/**
- * Creates a temporary RecordStore on the storage engine. This record store should be dropped by
- * the caller when done being used. The storage engine will drop any remaining temporary record
- * stores on startup.
+ * Creates a temporary RecordStore on the storage engine. This record store will drop itself
+ * automatically when it goes out of scope. This means the TemporaryRecordStore should not exist
+ * any longer than the OperationContext used to create it. On startup, the storage engine will
+ * drop any un-dropped temporary record stores.
*/
- virtual std::unique_ptr<RecordStore> makeTemporaryRecordStore(OperationContext* opCtx) = 0;
+ virtual std::unique_ptr<TemporaryRecordStore> makeTemporaryRecordStore(
+ OperationContext* opCtx) = 0;
/**
* This method will be called before there is a clean shutdown. Storage engines should
diff --git a/src/mongo/db/storage/temporary_record_store.h b/src/mongo/db/storage/temporary_record_store.h
new file mode 100644
index 00000000000..ea42f9d73be
--- /dev/null
+++ b/src/mongo/db/storage/temporary_record_store.h
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/record_store.h"
+
+namespace mongo {
+
+/**
+ * This is an RAII type that manages the lifetime of a temporary RecordStore.
+ *
+ * Derived classes must implement a destructor that drops the underlying RecordStore from the
+ * storage engine.
+ */
+class TemporaryRecordStore {
+ MONGO_DISALLOW_COPYING(TemporaryRecordStore);
+
+public:
+ TemporaryRecordStore(std::unique_ptr<RecordStore> rs) : _rs(std::move(rs)) {}
+
+ // Move constructor.
+ TemporaryRecordStore(TemporaryRecordStore&& other) noexcept : _rs(std::move(other._rs)) {}
+
+ virtual ~TemporaryRecordStore() {}
+
+ RecordStore* rs() {
+ return _rs.get();
+ }
+
+ const RecordStore* rs() const {
+ return _rs.get();
+ }
+
+protected:
+ std::unique_ptr<RecordStore> _rs;
+};
+} // namespace mongo
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 0ee4e2c2781..7923d2d90ed 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -427,6 +427,8 @@ stdx::function<bool(StringData)> initRsOplogBackgroundThreadCallback = [](String
};
} // namespace
+StringData WiredTigerKVEngine::kTableUriPrefix = "table:"_sd;
+
WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName,
const std::string& path,
ClockSource* cs,
@@ -794,10 +796,9 @@ Status WiredTigerKVEngine::_salvageIfNeeded(const char* uri) {
Status WiredTigerKVEngine::_rebuildIdent(WT_SESSION* session, const char* uri) {
invariant(_inRepairMode);
- static const char tablePrefix[] = "table:";
- invariant(std::string(uri).find(tablePrefix) == 0);
+ invariant(std::string(uri).find(kTableUriPrefix.rawData()) == 0);
- const std::string identName(uri + sizeof(tablePrefix) - 1);
+ const std::string identName(uri + kTableUriPrefix.size());
auto filePath = getDataFilePathForIdent(identName);
if (filePath) {
const boost::filesystem::path corruptFile(filePath->string() + ".corrupt");
@@ -1042,7 +1043,7 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::getGroupedRecordStore(
WiredTigerRecordStore::Params params;
params.ns = ns;
- params.uri = _uri(ident);
+ params.ident = ident.toString();
params.engineName = _canonicalName;
params.isCapped = options.capped;
params.isEphemeral = _ephemeral;
@@ -1074,7 +1075,8 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::getGroupedRecordStore(
}
string WiredTigerKVEngine::_uri(StringData ident) const {
- return string("table:") + ident.toString();
+ invariant(ident.find(kTableUriPrefix) == string::npos);
+ return kTableUriPrefix + ident.toString();
}
Status WiredTigerKVEngine::createGroupedSortedDataInterface(OperationContext* opCtx,
@@ -1146,7 +1148,7 @@ std::unique_ptr<RecordStore> WiredTigerKVEngine::makeTemporaryRecordStore(Operat
WiredTigerRecordStore::Params params;
params.ns = "";
- params.uri = _uri(ident);
+ params.ident = ident.toString();
params.engineName = _canonicalName;
params.isCapped = false;
params.isEphemeral = _ephemeral;
@@ -1190,7 +1192,7 @@ Status WiredTigerKVEngine::dropIdent(OperationContext* opCtx, StringData ident)
int ret = session.getSession()->drop(
session.getSession(), uri.c_str(), "force,checkpoint_wait=false");
- LOG(1) << "WT drop of " << uri << " res " << ret;
+ LOG(1) << "WT drop of " << uri << " res " << ret;
if (ret == 0) {
// yay, it worked
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 95f9cda47aa..6836d9868fa 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -68,6 +68,8 @@ struct WiredTigerFileVersion {
class WiredTigerKVEngine final : public KVEngine {
public:
static const int kDefaultJournalDelayMillis;
+ static StringData kTableUriPrefix;
+
WiredTigerKVEngine(const std::string& canonicalName,
const std::string& path,
ClockSource* cs,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp
index eb5077d48f8..37f624826ea 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_prefixed_record_store_test.cpp
@@ -99,7 +99,8 @@ public:
WiredTigerRecoveryUnit* ru =
checked_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit());
OperationContextNoop opCtx(ru);
- string uri = "table:" + ns;
+ string ident = ns;
+ string uri = WiredTigerKVEngine::kTableUriPrefix + ns;
const bool prefixed = true;
StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString(
@@ -116,7 +117,7 @@ public:
WiredTigerRecordStore::Params params;
params.ns = ns;
- params.uri = uri;
+ params.ident = ident;
params.engineName = kWiredTigerEngineName;
params.isCapped = false;
params.isEphemeral = false;
@@ -142,7 +143,8 @@ public:
WiredTigerRecoveryUnit* ru =
checked_cast<WiredTigerRecoveryUnit*>(_engine->newRecoveryUnit());
OperationContextNoop opCtx(ru);
- string uri = "table:a.b";
+ string ident = "a.b";
+ string uri = WiredTigerKVEngine::kTableUriPrefix + ident;
CollectionOptions options;
options.capped = true;
@@ -162,7 +164,7 @@ public:
WiredTigerRecordStore::Params params;
params.ns = ns;
- params.uri = uri;
+ params.ident = ident;
params.engineName = kWiredTigerEngineName;
params.isCapped = true;
params.isEphemeral = false;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 135d904f5ce..0d7c82d0b63 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -638,7 +638,8 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
OperationContext* ctx,
Params params)
: RecordStore(params.ns),
- _uri(params.uri),
+ _uri(WiredTigerKVEngine::kTableUriPrefix + params.ident),
+ _ident(params.ident),
_tableId(WiredTigerSession::genTableId()),
_engineName(params.engineName),
_isCapped(params.isCapped),
@@ -654,9 +655,12 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
_cappedDeleteCheckCount(0),
_sizeStorer(params.sizeStorer),
_kvEngine(kvEngine) {
+ invariant(_ident.size() > 0);
+
Status versionStatus = WiredTigerUtil::checkApplicationMetadataFormatVersion(
ctx, _uri, kMinimumRecordStoreVersion, kMaximumRecordStoreVersion)
.getStatus();
+
if (!versionStatus.isOK()) {
std::cout << " Version: " << versionStatus.reason() << std::endl;
if (versionStatus.code() == ErrorCodes::FailedToParse) {
@@ -674,7 +678,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
invariant(_cappedMaxDocs == -1);
}
- if (!params.isReadOnly) {
+ if (!params.isReadOnly && !isTemp()) {
bool replicatedWrites = getGlobalReplSettings().usingReplSets() ||
repl::ReplSettings::shouldRecoverFromOplogAsStandalone();
uassertStatusOK(WiredTigerUtil::setTableLogging(
@@ -686,7 +690,7 @@ WiredTigerRecordStore::WiredTigerRecordStore(WiredTigerKVEngine* kvEngine,
// The oplog always needs to be marked for size adjustment since it is journaled and also
// may change during replication recovery (if truncated).
sizeRecoveryState(getGlobalServiceContext())
- .markCollectionAsAlwaysNeedsSizeAdjustment(_uri);
+ .markCollectionAsAlwaysNeedsSizeAdjustment(_ident);
}
}
@@ -696,7 +700,11 @@ WiredTigerRecordStore::~WiredTigerRecordStore() {
_shuttingDown = true;
}
- LOG(1) << "~WiredTigerRecordStore for: " << ns();
+ if (!isTemp()) {
+ LOG(1) << "~WiredTigerRecordStore for: " << ns();
+ } else {
+ LOG(1) << "~WiredTigerRecordStore for temporary ident: " << getIdent();
+ }
if (_oplogStones) {
_oplogStones->kill();
@@ -744,9 +752,9 @@ void WiredTigerRecordStore::postConstructorInit(OperationContext* opCtx) {
// time but not as of the top of the oplog.
LOG_FOR_RECOVERY(2) << "Record store was empty; setting count metadata to zero but marking "
"record store as needing size adjustment during recovery. ns: "
- << ns() << ", ident: " << _uri;
+ << (isTemp() ? "(temp)" : ns()) << ", ident: " << _ident;
sizeRecoveryState(getGlobalServiceContext())
- .markCollectionAsAlwaysNeedsSizeAdjustment(_uri);
+ .markCollectionAsAlwaysNeedsSizeAdjustment(_ident);
_sizeInfo->dataSize.store(0);
_sizeInfo->numRecords.store(0);
@@ -907,7 +915,7 @@ int64_t WiredTigerRecordStore::_cappedDeleteAsNeeded(OperationContext* opCtx,
// replication recovery. If we don't mark the collection for size adjustment then we will not
// perform the capped deletions as expected. In that case, the collection is guaranteed to be
// empty at the stable timestamp and thus guaranteed to be marked for size adjustment.
- if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_uri)) {
+ if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_ident)) {
return 0;
}
@@ -1653,7 +1661,7 @@ void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* opCtx,
long long numRecords,
long long dataSize) {
// We're correcting the size as of now, future writes should be tracked.
- sizeRecoveryState(getGlobalServiceContext()).markCollectionAsAlwaysNeedsSizeAdjustment(_uri);
+ sizeRecoveryState(getGlobalServiceContext()).markCollectionAsAlwaysNeedsSizeAdjustment(_ident);
_sizeInfo->numRecords.store(numRecords);
_sizeInfo->dataSize.store(dataSize);
@@ -1689,7 +1697,7 @@ private:
};
void WiredTigerRecordStore::_changeNumRecords(OperationContext* opCtx, int64_t diff) {
- if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_uri)) {
+ if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_ident)) {
return;
}
@@ -1712,7 +1720,7 @@ private:
};
void WiredTigerRecordStore::_increaseDataSize(OperationContext* opCtx, int64_t amount) {
- if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_uri)) {
+ if (!sizeRecoveryState(getGlobalServiceContext()).collectionNeedsSizeAdjustment(_ident)) {
return;
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index 019d7e2331c..9cc17eb0b00 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -106,7 +106,7 @@ public:
struct Params {
StringData ns;
- std::string uri;
+ std::string ident;
std::string engineName;
bool isCapped;
bool isEphemeral;
@@ -233,7 +233,7 @@ public:
}
const std::string& getIdent() const override {
- return _uri;
+ return _ident;
}
uint64_t tableId() const {
@@ -334,6 +334,7 @@ private:
int64_t _cappedDeleteAsNeeded_inlock(OperationContext* opCtx, const RecordId& justInserted);
const std::string _uri;
+ const std::string _ident;
const uint64_t _tableId; // not persisted
// Canonical engine name to use for retrieving options
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
index cb886f7f1ab..719b2f3bbce 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit_test.cpp
@@ -74,7 +74,8 @@ public:
virtual std::unique_ptr<RecordStore> createRecordStore(OperationContext* opCtx,
const std::string& ns) final {
- std::string uri = "table:" + ns;
+ std::string ident = ns;
+ std::string uri = WiredTigerKVEngine::kTableUriPrefix + ns;
const bool prefixed = false;
StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString(
kWiredTigerEngineName, ns, CollectionOptions(), "", prefixed);
@@ -92,7 +93,7 @@ public:
WiredTigerRecordStore::Params params;
params.ns = ns;
- params.uri = uri;
+ params.ident = ident;
params.engineName = kWiredTigerEngineName;
params.isCapped = false;
params.isEphemeral = false;
@@ -536,7 +537,7 @@ TEST_F(WiredTigerRecoveryUnitTestFixture, ReadOnceCursorsAreNotCached) {
auto ru = WiredTigerRecoveryUnit::get(opCtx);
std::unique_ptr<RecordStore> rs(harnessHelper->createRecordStore(opCtx, "test.read_once"));
- auto uri = rs->getIdent();
+ auto uri = dynamic_cast<WiredTigerRecordStore*>(rs.get())->getURI();
// Insert a record.
ru->beginUnitOfWork(opCtx);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
index e8c4298f66b..fc2caf37290 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_standard_record_store_test.cpp
@@ -97,9 +97,9 @@ public:
virtual std::unique_ptr<RecordStore> newNonCappedRecordStore(const std::string& ns) {
WiredTigerRecoveryUnit* ru =
- dynamic_cast<WiredTigerRecoveryUnit*>(_engine.newRecoveryUnit());
+ checked_cast<WiredTigerRecoveryUnit*>(_engine.newRecoveryUnit());
OperationContextNoop opCtx(ru);
- string uri = "table:" + ns;
+ string uri = WiredTigerKVEngine::kTableUriPrefix + ns;
const bool prefixed = false;
StatusWith<std::string> result = WiredTigerRecordStore::generateCreateString(
@@ -116,7 +116,7 @@ public:
WiredTigerRecordStore::Params params;
params.ns = ns;
- params.uri = uri;
+ params.ident = ns;
params.engineName = kWiredTigerEngineName;
params.isCapped = false;
params.isEphemeral = false;
@@ -141,7 +141,8 @@ public:
WiredTigerRecoveryUnit* ru =
dynamic_cast<WiredTigerRecoveryUnit*>(_engine.newRecoveryUnit());
OperationContextNoop opCtx(ru);
- string uri = "table:a.b";
+ string ident = "a.b";
+ string uri = WiredTigerKVEngine::kTableUriPrefix + "a.b";
CollectionOptions options;
options.capped = true;
@@ -161,7 +162,7 @@ public:
WiredTigerRecordStore::Params params;
params.ns = ns;
- params.uri = uri;
+ params.ident = ident;
params.engineName = kWiredTigerEngineName;
params.isCapped = true;
params.isEphemeral = false;
@@ -215,9 +216,10 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) {
unique_ptr<WiredTigerHarnessHelper> harnessHelper(new WiredTigerHarnessHelper());
unique_ptr<RecordStore> rs(harnessHelper->newNonCappedRecordStore());
+ string ident = rs->getIdent();
string uri = checked_cast<WiredTigerRecordStore*>(rs.get())->getURI();
- string indexUri = "table:myindex";
+ string indexUri = WiredTigerKVEngine::kTableUriPrefix + "myindex";
const bool enableWtLogging = false;
WiredTigerSizeStorer ss(harnessHelper->conn(), indexUri, enableWtLogging);
checked_cast<WiredTigerRecordStore*>(rs.get())->setSizeStorer(&ss);
@@ -252,7 +254,7 @@ TEST(WiredTigerRecordStoreTest, SizeStorer1) {
ServiceContext::UniqueOperationContext opCtx(harnessHelper->newOperationContext());
WiredTigerRecordStore::Params params;
params.ns = "a.b"_sd;
- params.uri = uri;
+ params.ident = ident;
params.engineName = kWiredTigerEngineName;
params.isCapped = false;
params.isEphemeral = false;
@@ -318,10 +320,13 @@ private:
harnessHelper.reset(new WiredTigerHarnessHelper());
const bool enableWtLogging = false;
sizeStorer.reset(
- new WiredTigerSizeStorer(harnessHelper->conn(), "table:sizeStorer", enableWtLogging));
+ new WiredTigerSizeStorer(harnessHelper->conn(),
+ WiredTigerKVEngine::kTableUriPrefix + "sizeStorer",
+ enableWtLogging));
rs = harnessHelper->newNonCappedRecordStore();
WiredTigerRecordStore* wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
wtrs->setSizeStorer(sizeStorer.get());
+ ident = wtrs->getIdent();
uri = wtrs->getURI();
expectedNumRecords = 100;
@@ -361,6 +366,7 @@ protected:
std::unique_ptr<WiredTigerHarnessHelper> harnessHelper;
std::unique_ptr<WiredTigerSizeStorer> sizeStorer;
std::unique_ptr<RecordStore> rs;
+ std::string ident;
std::string uri;
long long expectedNumRecords;
@@ -418,7 +424,7 @@ TEST_F(SizeStorerValidateTest, InvalidSizeStorerAtCreation) {
WiredTigerRecordStore::Params params;
params.ns = "a.b"_sd;
- params.uri = uri;
+ params.ident = ident;
params.engineName = kWiredTigerEngineName;
params.isCapped = false;
params.isEphemeral = false;