/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include
#include "mongo/platform/basic.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog/index_consistency.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/query/query_yield.h"
#include "mongo/db/storage/key_string.h"
#include "mongo/db/storage/sorted_data_interface.h"
#include "mongo/util/elapsed_tracker.h"
namespace mongo {
namespace {
// The number of items we can scan before we must yield.
static const int kScanLimit = 1000;
} // namespace
IndexConsistency::IndexConsistency(OperationContext* opCtx,
Collection* collection,
NamespaceString nss,
RecordStore* recordStore,
std::unique_ptr collLk,
const bool background)
: _opCtx(opCtx),
_collection(collection),
_nss(nss),
_recordStore(recordStore),
_collLk(std::move(collLk)),
_isBackground(background),
_tracker(opCtx->getServiceContext()->getFastClockSource(),
internalQueryExecYieldIterations.load(),
Milliseconds(internalQueryExecYieldPeriodMS.load())) {
IndexCatalog* indexCatalog = _collection->getIndexCatalog();
IndexCatalog::IndexIterator indexIterator = indexCatalog->getIndexIterator(_opCtx, false);
int indexNumber = 0;
while (indexIterator.more()) {
const IndexDescriptor* descriptor = indexIterator.next();
std::string indexNs = descriptor->indexNamespace();
_indexNumber[descriptor->indexNamespace()] = indexNumber;
IndexInfo indexInfo;
indexInfo.isReady =
_collection->getCatalogEntry()->isIndexReady(opCtx, descriptor->indexName());
uint32_t indexNsHash;
MurmurHash3_x86_32(indexNs.c_str(), indexNs.size(), 0, &indexNsHash);
indexInfo.indexNsHash = indexNsHash;
indexInfo.indexScanFinished = false;
indexInfo.numKeys = 0;
indexInfo.numLongKeys = 0;
indexInfo.numRecords = 0;
indexInfo.numExtraIndexKeys = 0;
_indexesInfo[indexNumber] = indexInfo;
indexNumber++;
}
}
void IndexConsistency::addDocKey(const KeyString& ks, int indexNumber) {
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return;
}
stdx::lock_guard lock(_classMutex);
_addDocKey_inlock(ks, indexNumber);
}
void IndexConsistency::removeDocKey(const KeyString& ks, int indexNumber) {
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return;
}
stdx::lock_guard lock(_classMutex);
_removeDocKey_inlock(ks, indexNumber);
}
void IndexConsistency::addIndexKey(const KeyString& ks, int indexNumber) {
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return;
}
stdx::lock_guard lock(_classMutex);
_addIndexKey_inlock(ks, indexNumber);
}
void IndexConsistency::removeIndexKey(const KeyString& ks, int indexNumber) {
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return;
}
stdx::lock_guard lock(_classMutex);
_removeIndexKey_inlock(ks, indexNumber);
}
void IndexConsistency::addLongIndexKey(int indexNumber) {
stdx::lock_guard lock(_classMutex);
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return;
}
_indexesInfo[indexNumber].numRecords++;
_indexesInfo[indexNumber].numLongKeys++;
}
int64_t IndexConsistency::getNumKeys(int indexNumber) const {
stdx::lock_guard lock(_classMutex);
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return 0;
}
return _indexesInfo.at(indexNumber).numKeys;
}
int64_t IndexConsistency::getNumLongKeys(int indexNumber) const {
stdx::lock_guard lock(_classMutex);
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return 0;
}
return _indexesInfo.at(indexNumber).numLongKeys;
}
int64_t IndexConsistency::getNumRecords(int indexNumber) const {
stdx::lock_guard lock(_classMutex);
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return 0;
}
return _indexesInfo.at(indexNumber).numRecords;
}
bool IndexConsistency::haveEntryMismatch() const {
stdx::lock_guard lock(_classMutex);
for (auto iterator = _indexKeyCount.begin(); iterator != _indexKeyCount.end(); iterator++) {
if (iterator->second != 0) {
return true;
}
}
return false;
}
int64_t IndexConsistency::getNumExtraIndexKeys(int indexNumber) const {
stdx::lock_guard lock(_classMutex);
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return 0;
}
return _indexesInfo.at(indexNumber).numExtraIndexKeys;
}
void IndexConsistency::applyChange(const IndexDescriptor* descriptor,
const boost::optional& indexEntry,
ValidationOperation operation) {
stdx::lock_guard lock(_classMutex);
const std::string& indexNs = descriptor->indexNamespace();
int indexNumber = getIndexNumber(indexNs);
if (indexNumber == -1) {
return;
}
// Ignore indexes that weren't ready before we started validation.
if (!_indexesInfo.at(indexNumber).isReady) {
return;
}
const auto& key = descriptor->keyPattern();
const Ordering ord = Ordering::make(key);
KeyString::Version version = KeyString::kLatestVersion;
KeyString ks(version, indexEntry->key, ord, indexEntry->loc);
if (_stage == ValidationStage::DOCUMENT) {
_setYieldAtRecord_inlock(indexEntry->loc);
if (_isBeforeLastProcessedRecordId_inlock(indexEntry->loc)) {
if (operation == ValidationOperation::INSERT) {
if (indexEntry->key.objsize() >=
static_cast(KeyString::TypeBits::kMaxKeyBytes)) {
// Index keys >= 1024 bytes are not indexed but are stored in the document key
// set.
_indexesInfo[indexNumber].numRecords++;
_indexesInfo[indexNumber].numLongKeys++;
} else {
_addDocKey_inlock(ks, indexNumber);
}
} else if (operation == ValidationOperation::REMOVE) {
if (indexEntry->key.objsize() >=
static_cast(KeyString::TypeBits::kMaxKeyBytes)) {
_indexesInfo[indexNumber].numRecords--;
_indexesInfo[indexNumber].numLongKeys--;
} else {
_removeDocKey_inlock(ks, indexNumber);
}
}
}
} else if (_stage == ValidationStage::INDEX) {
// Index entries with key sizes >= 1024 bytes are not indexed.
if (indexEntry->key.objsize() >= static_cast(KeyString::TypeBits::kMaxKeyBytes)) {
return;
}
if (_isIndexScanning_inlock(indexNumber)) {
_setYieldAtIndexEntry_inlock(ks);
}
const bool wasIndexScanStarted =
_isIndexFinished_inlock(indexNumber) || _isIndexScanning_inlock(indexNumber);
const bool isUpcomingChangeToCurrentIndex =
_isIndexScanning_inlock(indexNumber) && !_isBeforeLastProcessedIndexEntry_inlock(ks);
if (!wasIndexScanStarted || isUpcomingChangeToCurrentIndex) {
// We haven't started scanning this index namespace yet so everything
// happens after the cursor, OR, we are scanning this index namespace,
// and an event occured after our cursor
if (operation == ValidationOperation::INSERT) {
_removeIndexKey_inlock(ks, indexNumber);
_indexesInfo.at(indexNumber).numExtraIndexKeys++;
} else if (operation == ValidationOperation::REMOVE) {
_addIndexKey_inlock(ks, indexNumber);
_indexesInfo.at(indexNumber).numExtraIndexKeys--;
}
}
}
}
void IndexConsistency::nextStage() {
stdx::lock_guard lock(_classMutex);
if (_stage == ValidationStage::DOCUMENT) {
_stage = ValidationStage::INDEX;
} else if (_stage == ValidationStage::INDEX) {
_stage = ValidationStage::NONE;
}
}
ValidationStage IndexConsistency::getStage() const {
stdx::lock_guard lock(_classMutex);
return _stage;
}
void IndexConsistency::setLastProcessedRecordId(RecordId recordId) {
stdx::lock_guard lock(_classMutex);
if (!recordId.isNormal()) {
_lastProcessedRecordId = boost::none;
} else {
_lastProcessedRecordId = recordId;
}
}
void IndexConsistency::setLastProcessedIndexEntry(
const IndexDescriptor& descriptor, const boost::optional& indexEntry) {
const auto& key = descriptor.keyPattern();
const Ordering ord = Ordering::make(key);
KeyString::Version version = KeyString::kLatestVersion;
stdx::lock_guard lock(_classMutex);
if (!indexEntry) {
_lastProcessedIndexEntry.reset();
} else {
_lastProcessedIndexEntry.reset(
new KeyString(version, indexEntry->key, ord, indexEntry->loc));
}
}
void IndexConsistency::notifyStartIndex(int indexNumber) {
stdx::lock_guard lock(_classMutex);
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return;
}
_lastProcessedIndexEntry.reset(nullptr);
_currentIndex = indexNumber;
}
void IndexConsistency::notifyDoneIndex(int indexNumber) {
stdx::lock_guard lock(_classMutex);
if (indexNumber < 0 || indexNumber >= static_cast(_indexesInfo.size())) {
return;
}
_lastProcessedIndexEntry.reset(nullptr);
_currentIndex = -1;
_indexesInfo.at(indexNumber).indexScanFinished = true;
}
int IndexConsistency::getIndexNumber(const std::string& indexNs) {
auto search = _indexNumber.find(indexNs);
if (search != _indexNumber.end()) {
return search->second;
}
return -1;
}
bool IndexConsistency::shouldGetNewSnapshot(const RecordId recordId) const {
stdx::lock_guard lock(_classMutex);
if (!_yieldAtRecordId) {
return false;
}
return _yieldAtRecordId <= recordId;
}
bool IndexConsistency::shouldGetNewSnapshot(const KeyString& keyString) const {
stdx::lock_guard lock(_classMutex);
if (!_yieldAtIndexEntry) {
return false;
}
return *_yieldAtIndexEntry <= keyString;
}
void IndexConsistency::relockCollectionWithMode(LockMode mode) {
// Release the lock and grab the provided lock mode.
_collLk.reset();
_collLk.reset(new Lock::CollectionLock(_opCtx->lockState(), _nss.toString(), mode));
invariant(_opCtx->lockState()->isCollectionLockedForMode(_nss.toString(), mode));
// Check if the operation was killed.
_opCtx->checkForInterrupt();
// Ensure it is safe to continue.
uassertStatusOK(_throwExceptionIfError());
}
bool IndexConsistency::scanLimitHit() {
stdx::lock_guard lock(_classMutex);
// We have to yield every so many scans while doing background validation only.
return _isBackground && _tracker.intervalHasElapsed();
}
void IndexConsistency::yield() {
stdx::unique_lock lock(_classMutex);
// Reset the yield tracker
_tracker.resetLastTime();
lock.unlock();
QueryYield::yieldAllLocks(_opCtx, nullptr, _nss);
lock.lock();
// Check if the operation was killed.
_opCtx->checkForInterrupt();
_yieldAtRecordId = boost::none;
_yieldAtIndexEntry.reset();
// Ensure it is safe to continue.
uassertStatusOK(_throwExceptionIfError());
}
void IndexConsistency::_addDocKey_inlock(const KeyString& ks, int indexNumber) {
// Ignore indexes that weren't ready before we started validation.
if (!_indexesInfo.at(indexNumber).isReady) {
return;
}
const uint32_t hash = _hashKeyString(ks, indexNumber);
_indexKeyCount[hash]++;
_indexesInfo.at(indexNumber).numRecords++;
}
void IndexConsistency::_removeDocKey_inlock(const KeyString& ks, int indexNumber) {
// Ignore indexes that weren't ready before we started validation.
if (!_indexesInfo.at(indexNumber).isReady) {
return;
}
const uint32_t hash = _hashKeyString(ks, indexNumber);
_indexKeyCount[hash]--;
_indexesInfo.at(indexNumber).numRecords--;
}
void IndexConsistency::_addIndexKey_inlock(const KeyString& ks, int indexNumber) {
// Ignore indexes that weren't ready before we started validation.
if (!_indexesInfo.at(indexNumber).isReady) {
return;
}
const uint32_t hash = _hashKeyString(ks, indexNumber);
_indexKeyCount[hash]--;
_indexesInfo.at(indexNumber).numKeys++;
}
void IndexConsistency::_removeIndexKey_inlock(const KeyString& ks, int indexNumber) {
// Ignore indexes that weren't ready before we started validation.
if (!_indexesInfo.at(indexNumber).isReady) {
return;
}
const uint32_t hash = _hashKeyString(ks, indexNumber);
_indexKeyCount[hash]++;
_indexesInfo.at(indexNumber).numKeys--;
}
bool IndexConsistency::_isIndexFinished_inlock(int indexNumber) const {
return _indexesInfo.at(indexNumber).indexScanFinished;
}
bool IndexConsistency::_isIndexScanning_inlock(int indexNumber) const {
return indexNumber == _currentIndex;
}
void IndexConsistency::_setYieldAtRecord_inlock(const RecordId recordId) {
if (_isBeforeLastProcessedRecordId_inlock(recordId)) {
return;
}
if (!_yieldAtRecordId || recordId <= _yieldAtRecordId) {
_yieldAtRecordId = recordId;
}
}
void IndexConsistency::_setYieldAtIndexEntry_inlock(const KeyString& keyString) {
if (_isBeforeLastProcessedIndexEntry_inlock(keyString)) {
return;
}
if (!_yieldAtIndexEntry || keyString <= *_yieldAtIndexEntry) {
KeyString::Version version = KeyString::kLatestVersion;
_yieldAtIndexEntry.reset(new KeyString(version));
_yieldAtIndexEntry->resetFromBuffer(keyString.getBuffer(), keyString.getSize());
}
}
bool IndexConsistency::_isBeforeLastProcessedRecordId_inlock(RecordId recordId) const {
if (_lastProcessedRecordId && recordId <= _lastProcessedRecordId) {
return true;
}
return false;
}
bool IndexConsistency::_isBeforeLastProcessedIndexEntry_inlock(const KeyString& keyString) const {
if (_lastProcessedIndexEntry && keyString <= *_lastProcessedIndexEntry) {
return true;
}
return false;
}
uint32_t IndexConsistency::_hashKeyString(const KeyString& ks, int indexNumber) const {
uint32_t indexNsHash = _indexesInfo.at(indexNumber).indexNsHash;
MurmurHash3_x86_32(
ks.getTypeBits().getBuffer(), ks.getTypeBits().getSize(), indexNsHash, &indexNsHash);
MurmurHash3_x86_32(ks.getBuffer(), ks.getSize(), indexNsHash, &indexNsHash);
return indexNsHash % (1U << 22);
}
Status IndexConsistency::_throwExceptionIfError() {
Database* database = dbHolder().get(_opCtx, _nss.db());
// Ensure the database still exists.
if (!database) {
return Status(ErrorCodes::NamespaceNotFound,
"The database was dropped during background validation");
}
Collection* collection = database->getCollection(_opCtx, _nss);
// Ensure the collection still exists.
if (!collection) {
return Status(ErrorCodes::NamespaceNotFound,
"The collection was dropped during background validation");
}
// Ensure no indexes were removed or added.
IndexCatalog* indexCatalog = collection->getIndexCatalog();
IndexCatalog::IndexIterator indexIterator = indexCatalog->getIndexIterator(_opCtx, false);
int numRelevantIndexes = 0;
while (indexIterator.more()) {
const IndexDescriptor* descriptor = indexIterator.next();
int indexNumber = getIndexNumber(descriptor->indexNamespace());
if (indexNumber == -1) {
// Allow the collection scan to finish to verify that all the records are valid BSON.
if (_stage != ValidationStage::DOCUMENT) {
// An index was added.
return Status(ErrorCodes::IndexModified,
"An index was added during background validation");
}
} else {
// Ignore indexes that weren't ready
if (_indexesInfo.at(indexNumber).isReady) {
numRelevantIndexes++;
}
}
}
if (numRelevantIndexes != static_cast(_indexesInfo.size())) {
// Allow the collection scan to finish to verify that all the records are valid BSON.
if (_stage != ValidationStage::DOCUMENT) {
// An index was dropped.
return Status(ErrorCodes::IndexModified,
"An index was dropped during background validation");
}
}
return Status::OK();
}
} // namespace mongo