diff options
Diffstat (limited to 'src/mongo/db/repair_database.cpp')
-rw-r--r-- | src/mongo/db/repair_database.cpp | 128 |
1 files changed, 22 insertions, 106 deletions
diff --git a/src/mongo/db/repair_database.cpp b/src/mongo/db/repair_database.cpp index 22215db1fa4..9c7f6df9e65 100644 --- a/src/mongo/db/repair_database.cpp +++ b/src/mongo/db/repair_database.cpp @@ -53,6 +53,7 @@ #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/index/index_descriptor.h" +#include "mongo/db/index_builds_coordinator.h" #include "mongo/db/logical_clock.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/db/storage/storage_engine.h" @@ -113,10 +114,7 @@ StatusWith<IndexNameObjs> getIndexNameObjs(OperationContext* opCtx, Status rebuildIndexesOnCollection(OperationContext* opCtx, DatabaseCatalogEntry* dbce, CollectionCatalogEntry* cce, - const IndexNameObjs& indexNameObjs) { - const std::vector<std::string>& indexNames = indexNameObjs.first; - const std::vector<BSONObj>& indexSpecs = indexNameObjs.second; - + const std::vector<BSONObj>& indexSpecs) { // Skip the rest if there are no indexes to rebuild. if (indexSpecs.empty()) return Status::OK(); @@ -124,110 +122,27 @@ Status rebuildIndexesOnCollection(OperationContext* opCtx, const auto& ns = cce->ns().ns(); auto rs = dbce->getRecordStore(ns); - std::unique_ptr<Collection> collection; - std::unique_ptr<MultiIndexBlock> indexer; - { - // These steps are combined into a single WUOW to ensure there are no commits without - // the indexes. - // 1) Drop all indexes. - // 2) Open the Collection - // 3) Start the index build process. - - WriteUnitOfWork wuow(opCtx); - - { // 1 - for (size_t i = 0; i < indexNames.size(); i++) { - Status s = cce->removeIndex(opCtx, indexNames[i]); - if (!s.isOK()) - return s; - } - } - - // Indexes must be dropped before we open the Collection otherwise we could attempt to - // open a bad index and fail. - // TODO see if MultiIndexBlock can be made to work without a Collection. - const auto uuid = cce->getCollectionOptions(opCtx).uuid; - auto databaseHolder = DatabaseHolder::get(opCtx); - collection = databaseHolder->makeCollection(opCtx, ns, uuid, cce, rs, dbce); - - indexer = std::make_unique<MultiIndexBlock>(opCtx, collection.get()); - Status status = indexer->init(indexSpecs, MultiIndexBlock::kNoopOnInitFn).getStatus(); - if (!status.isOK()) { - // The WUOW will handle cleanup, so the indexer shouldn't do its own. - indexer->abortWithoutCleanup(); - return status; - } - - wuow.commit(); - } - - // Iterate all records in the collection. Delete them if they aren't valid BSON. Index them - // if they are. - - long long numRecords = 0; - long long dataSize = 0; - - auto cursor = rs->getCursor(opCtx); - auto record = cursor->next(); - while (record) { - opCtx->checkForInterrupt(); - // Cursor is left one past the end of the batch inside writeConflictRetry - auto beginBatchId = record->id; - Status status = writeConflictRetry(opCtx, "repairDatabase", cce->ns().ns(), [&] { - // In the case of WCE in a partial batch, we need to go back to the beginning - if (!record || (beginBatchId != record->id)) { - record = cursor->seekExact(beginBatchId); - } - WriteUnitOfWork wunit(opCtx); - for (int i = 0; record && i < internalInsertMaxBatchSize.load(); i++) { - RecordId id = record->id; - RecordData& data = record->data; - // Use the latest BSON validation version. We retain decimal data when repairing - // database even if decimal is disabled. - auto validStatus = validateBSON(data.data(), data.size(), BSONVersion::kLatest); - if (!validStatus.isOK()) { - warning() << "Invalid BSON detected at " << id << ": " << redact(validStatus) - << ". Deleting."; - rs->deleteRecord(opCtx, id); - } else { - numRecords++; - dataSize += data.size(); - auto insertStatus = indexer->insert(data.releaseToBson(), id); - if (!insertStatus.isOK()) { - return insertStatus; - } - } - record = cursor->next(); - } - cursor->save(); // Can't fail per API definition - // When this exits via success or WCE, we need to restore the cursor - ON_BLOCK_EXIT([ opCtx, ns = cce->ns().ns(), &cursor ]() { - // restore CAN throw WCE per API - writeConflictRetry( - opCtx, "retryRestoreCursor", ns, [&cursor] { cursor->restore(); }); - }); - wunit.commit(); - return Status::OK(); - }); - if (!status.isOK()) { - return status; - } + // Open the collection. + const auto uuid = cce->getCollectionOptions(opCtx).uuid; + auto databaseHolder = DatabaseHolder::get(opCtx); + std::unique_ptr<Collection> collection = + databaseHolder->makeCollection(opCtx, ns, uuid, cce, rs, dbce); + + // Rebuild the indexes provided by 'indexSpecs'. + IndexBuildsCoordinator* indexBuildsCoord = IndexBuildsCoordinator::get(opCtx); + UUID buildUUID = UUID::gen(); + auto swRebuild = indexBuildsCoord->startIndexRebuildForRecovery( + opCtx, std::move(collection), indexSpecs, buildUUID); + if (!swRebuild.isOK()) { + return swRebuild.getStatus(); } - Status status = indexer->dumpInsertsFromBulk(); - if (!status.isOK()) - return status; + auto[numRecords, dataSize] = swRebuild.getValue(); - { - WriteUnitOfWork wunit(opCtx); - status = - indexer->commit(MultiIndexBlock::kNoopOnCreateEachFn, MultiIndexBlock::kNoopOnCommitFn); - if (!status.isOK()) { - return status; - } - rs->updateStatsAfterRepair(opCtx, numRecords, dataSize); - wunit.commit(); - } + // Update the record store stats after finishing and committing the index builds. + WriteUnitOfWork wuow(opCtx); + rs->updateStatsAfterRepair(opCtx, numRecords, dataSize); + wuow.commit(); return Status::OK(); } @@ -263,7 +178,8 @@ Status repairCollections(OperationContext* opCtx, if (!swIndexNameObjs.isOK()) return swIndexNameObjs.getStatus(); - Status status = rebuildIndexesOnCollection(opCtx, dbce, cce, swIndexNameObjs.getValue()); + std::vector<BSONObj> indexSpecs = swIndexNameObjs.getValue().second; + Status status = rebuildIndexesOnCollection(opCtx, dbce, cce, indexSpecs); if (!status.isOK()) return status; |