/*-
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
#include
#include "mongo/platform/basic.h"
#include "mongo/db/catalog/collection_impl.h"
#include "mongo/base/counter.h"
#include "mongo/base/init.h"
#include "mongo/base/owned_pointer_map.h"
#include "mongo/bson/ordering.h"
#include "mongo/bson/simple_bsonelement_comparator.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/background.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/database_catalog_entry.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/curop.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/keypattern.h"
#include "mongo/db/matcher/expression_parser.h"
#include "mongo/db/matcher/extensions_callback_disallow_extensions.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/key_string.h"
#include "mongo/db/storage/mmap_v1/mmap_v1_options.h"
#include "mongo/db/storage/record_fetcher.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/update/update_driver.h"
#include "mongo/db/auth/user_document_parser.h" // XXX-ANDY
#include "mongo/rpc/object_check.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
MONGO_INITIALIZER(InitializeCollectionFactory)(InitializerContext* const) {
Collection::registerFactory(
[](Collection* const _this,
OperationContext* const opCtx,
const StringData fullNS,
OptionalCollectionUUID uuid,
CollectionCatalogEntry* const details,
RecordStore* const recordStore,
DatabaseCatalogEntry* const dbce) -> std::unique_ptr {
return stdx::make_unique(
_this, opCtx, fullNS, uuid, details, recordStore, dbce);
});
return Status::OK();
}
MONGO_INITIALIZER(InitializeParseValidationLevelImpl)(InitializerContext* const) {
Collection::registerParseValidationLevelImpl(
[](const StringData data) { return CollectionImpl::parseValidationLevel(data); });
return Status::OK();
}
MONGO_INITIALIZER(InitializeParseValidationActionImpl)(InitializerContext* const) {
Collection::registerParseValidationActionImpl(
[](const StringData data) { return CollectionImpl::parseValidationAction(data); });
return Status::OK();
}
// Used below to fail during inserts.
MONGO_FP_DECLARE(failCollectionInserts);
const auto bannedExpressionsInValidators = std::set{
"$geoNear", "$near", "$nearSphere", "$text", "$where",
};
Status checkValidatorForBannedExpressions(const BSONObj& validator) {
for (auto field : validator) {
const auto name = field.fieldNameStringData();
if (name[0] == '$' && bannedExpressionsInValidators.count(name)) {
return {ErrorCodes::InvalidOptions,
str::stream() << name << " is not allowed in collection validators"};
}
if (field.type() == Object || field.type() == Array) {
auto status = checkValidatorForBannedExpressions(field.Obj());
if (!status.isOK())
return status;
}
}
return Status::OK();
}
// Uses the collator factory to convert the BSON representation of a collator to a
// CollatorInterface. Returns null if the BSONObj is empty. We expect the stored collation to be
// valid, since it gets validated on collection create.
std::unique_ptr parseCollation(OperationContext* opCtx,
const NamespaceString& nss,
BSONObj collationSpec) {
if (collationSpec.isEmpty()) {
return {nullptr};
}
auto collator =
CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collationSpec);
// If the collection's default collator has a version not currently supported by our ICU
// integration, shut down the server. Errors other than IncompatibleCollationVersion should not
// be possible, so these are an invariant rather than fassert.
if (collator == ErrorCodes::IncompatibleCollationVersion) {
log() << "Collection " << nss
<< " has a default collation which is incompatible with this version: "
<< collationSpec;
fassertFailedNoTrace(40144);
}
invariantOK(collator.getStatus());
return std::move(collator.getValue());
}
}
using std::unique_ptr;
using std::endl;
using std::string;
using std::vector;
using logger::LogComponent;
static const int IndexKeyMaxSize = 1024; // this goes away with SERVER-3372
CollectionImpl::CollectionImpl(Collection* _this_init,
OperationContext* opCtx,
StringData fullNS,
OptionalCollectionUUID uuid,
CollectionCatalogEntry* details,
RecordStore* recordStore,
DatabaseCatalogEntry* dbce)
: _ns(fullNS),
_uuid(uuid),
_details(details),
_recordStore(recordStore),
_dbce(dbce),
_needCappedLock(supportsDocLocking() && _recordStore->isCapped() && _ns.db() != "local"),
_infoCache(_this_init, _ns),
_indexCatalog(_this_init, this->getCatalogEntry()->getMaxAllowedIndexes()),
_collator(parseCollation(opCtx, _ns, _details->getCollectionOptions(opCtx).collation)),
_validatorDoc(_details->getCollectionOptions(opCtx).validator.getOwned()),
_validator(uassertStatusOK(parseValidator(_validatorDoc))),
_validationAction(uassertStatusOK(
parseValidationAction(_details->getCollectionOptions(opCtx).validationAction))),
_validationLevel(uassertStatusOK(
parseValidationLevel(_details->getCollectionOptions(opCtx).validationLevel))),
_cursorManager(_ns),
_cappedNotifier(_recordStore->isCapped() ? stdx::make_unique()
: nullptr),
_mustTakeCappedLockOnInsert(isCapped() && !_ns.isSystemDotProfile() && !_ns.isOplog()),
_this(_this_init) {}
void CollectionImpl::init(OperationContext* opCtx) {
_magic = kMagicNumber;
_indexCatalog.init(opCtx);
if (isCapped())
_recordStore->setCappedCallback(this);
_infoCache.init(opCtx);
}
CollectionImpl::~CollectionImpl() {
verify(ok());
if (isCapped()) {
_recordStore->setCappedCallback(nullptr);
_cappedNotifier->kill();
}
_magic = 0;
}
bool CollectionImpl::requiresIdIndex() const {
if (_ns.isVirtualized() || _ns.isOplog()) {
// No indexes on virtual collections or the oplog.
return false;
}
if (_ns.isSystem()) {
StringData shortName = _ns.coll().substr(_ns.coll().find('.') + 1);
if (shortName == "indexes" || shortName == "namespaces" || shortName == "profile") {
return false;
}
}
return true;
}
std::unique_ptr CollectionImpl::getCursor(OperationContext* opCtx,
bool forward) const {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS));
invariant(ok());
return _recordStore->getCursor(opCtx, forward);
}
vector> CollectionImpl::getManyCursors(
OperationContext* opCtx) const {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS));
return _recordStore->getManyCursors(opCtx);
}
bool CollectionImpl::findDoc(OperationContext* opCtx,
const RecordId& loc,
Snapshotted* out) const {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS));
RecordData rd;
if (!_recordStore->findRecord(opCtx, loc, &rd))
return false;
*out = Snapshotted(opCtx->recoveryUnit()->getSnapshotId(), rd.releaseToBson());
return true;
}
Status CollectionImpl::checkValidation(OperationContext* opCtx, const BSONObj& document) const {
if (!_validator)
return Status::OK();
if (_validationLevel == ValidationLevel::OFF)
return Status::OK();
if (documentValidationDisabled(opCtx))
return Status::OK();
if (_validator->matchesBSON(document))
return Status::OK();
if (_validationAction == ValidationAction::WARN) {
warning() << "Document would fail validation"
<< " collection: " << ns() << " doc: " << redact(document);
return Status::OK();
}
return {ErrorCodes::DocumentValidationFailure, "Document failed validation"};
}
StatusWithMatchExpression CollectionImpl::parseValidator(const BSONObj& validator) const {
if (validator.isEmpty())
return {nullptr};
if (ns().isSystem() && !ns().isDropPendingNamespace()) {
return {ErrorCodes::InvalidOptions,
"Document validators not allowed on system collections."};
}
if (ns().isOnInternalDb()) {
return {ErrorCodes::InvalidOptions,
str::stream() << "Document validators are not allowed on collections in"
<< " the "
<< ns().db()
<< " database"};
}
{
auto status = checkValidatorForBannedExpressions(validator);
if (!status.isOK())
return status;
}
auto statusWithMatcher = MatchExpressionParser::parse(
validator, ExtensionsCallbackDisallowExtensions(), _collator.get());
if (!statusWithMatcher.isOK())
return statusWithMatcher.getStatus();
return statusWithMatcher;
}
Status CollectionImpl::insertDocumentsForOplog(OperationContext* opCtx,
const DocWriter* const* docs,
size_t nDocs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
// Since this is only for the OpLog, we can assume these for simplicity.
// This also means that we do not need to forward this object to the OpObserver, which is good
// because it would defeat the purpose of using DocWriter.
invariant(!_validator);
invariant(!_indexCatalog.haveAnyIndexes());
invariant(!_mustTakeCappedLockOnInsert);
Status status = _recordStore->insertRecordsWithDocWriter(opCtx, docs, nDocs);
if (!status.isOK())
return status;
opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
return status;
}
Status CollectionImpl::insertDocuments(OperationContext* opCtx,
const vector::const_iterator begin,
const vector::const_iterator end,
OpDebug* opDebug,
bool enforceQuota,
bool fromMigrate) {
MONGO_FAIL_POINT_BLOCK(failCollectionInserts, extraData) {
const BSONObj& data = extraData.getData();
const auto collElem = data["collectionNS"];
// If the failpoint specifies no collection or matches the existing one, fail.
if (!collElem || _ns == collElem.str()) {
const std::string msg = str::stream()
<< "Failpoint (failCollectionInserts) has been enabled (" << data
<< "), so rejecting insert (first doc): " << *begin;
log() << msg;
return {ErrorCodes::FailPointEnabled, msg};
}
}
// Should really be done in the collection object at creation and updated on index create.
const bool hasIdIndex = _indexCatalog.findIdIndex(opCtx);
for (auto it = begin; it != end; it++) {
if (hasIdIndex && (*it)["_id"].eoo()) {
return Status(ErrorCodes::InternalError,
str::stream()
<< "Collection::insertDocument got document without _id for ns:"
<< _ns.ns());
}
auto status = checkValidation(opCtx, *it);
if (!status.isOK())
return status;
}
const SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId();
if (_mustTakeCappedLockOnInsert)
synchronizeOnCappedInFlightResource(opCtx->lockState(), _ns);
Status status = _insertDocuments(opCtx, begin, end, enforceQuota, opDebug);
if (!status.isOK())
return status;
invariant(sid == opCtx->recoveryUnit()->getSnapshotId());
getGlobalServiceContext()->getOpObserver()->onInserts(
opCtx, ns(), uuid(), begin, end, fromMigrate);
opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
return Status::OK();
}
Status CollectionImpl::insertDocument(OperationContext* opCtx,
const BSONObj& docToInsert,
OpDebug* opDebug,
bool enforceQuota,
bool fromMigrate) {
vector docs;
docs.push_back(docToInsert);
return insertDocuments(opCtx, docs.begin(), docs.end(), opDebug, enforceQuota, fromMigrate);
}
Status CollectionImpl::insertDocument(OperationContext* opCtx,
const BSONObj& doc,
const std::vector& indexBlocks,
bool enforceQuota) {
MONGO_FAIL_POINT_BLOCK(failCollectionInserts, extraData) {
const BSONObj& data = extraData.getData();
const auto collElem = data["collectionNS"];
// If the failpoint specifies no collection or matches the existing one, fail.
if (!collElem || _ns == collElem.str()) {
const std::string msg = str::stream()
<< "Failpoint (failCollectionInserts) has been enabled (" << data
<< "), so rejecting insert: " << doc;
log() << msg;
return {ErrorCodes::FailPointEnabled, msg};
}
}
{
auto status = checkValidation(opCtx, doc);
if (!status.isOK())
return status;
}
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
if (_mustTakeCappedLockOnInsert)
synchronizeOnCappedInFlightResource(opCtx->lockState(), _ns);
StatusWith loc = _recordStore->insertRecord(
opCtx, doc.objdata(), doc.objsize(), _enforceQuota(enforceQuota));
if (!loc.isOK())
return loc.getStatus();
for (auto&& indexBlock : indexBlocks) {
Status status = indexBlock->insert(doc, loc.getValue());
if (!status.isOK()) {
return status;
}
}
vector docs;
docs.push_back(doc);
getGlobalServiceContext()->getOpObserver()->onInserts(
opCtx, ns(), uuid(), docs.begin(), docs.end(), false);
opCtx->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); });
return loc.getStatus();
}
Status CollectionImpl::_insertDocuments(OperationContext* opCtx,
const vector::const_iterator begin,
const vector::const_iterator end,
bool enforceQuota,
OpDebug* opDebug) {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
const size_t count = std::distance(begin, end);
if (isCapped() && _indexCatalog.haveAnyIndexes() && count > 1) {
// We require that inserts to indexed capped collections be done one-at-a-time to avoid the
// possibility that a later document causes an earlier document to be deleted before it can
// be indexed.
// TODO SERVER-21512 It would be better to handle this here by just doing single inserts.
return {ErrorCodes::OperationCannotBeBatched,
"Can't batch inserts into indexed capped collections"};
}
if (_needCappedLock) {
// X-lock the metadata resource for this capped collection until the end of the WUOW. This
// prevents the primary from executing with more concurrency than secondaries.
// See SERVER-21646.
Lock::ResourceLock heldUntilEndOfWUOW{
opCtx->lockState(), ResourceId(RESOURCE_METADATA, _ns.ns()), MODE_X};
}
std::vector records;
records.reserve(count);
for (auto it = begin; it != end; it++) {
Record record = {RecordId(), RecordData(it->objdata(), it->objsize())};
records.push_back(record);
}
Status status = _recordStore->insertRecords(opCtx, &records, _enforceQuota(enforceQuota));
if (!status.isOK())
return status;
std::vector bsonRecords;
bsonRecords.reserve(count);
int recordIndex = 0;
for (auto it = begin; it != end; it++) {
RecordId loc = records[recordIndex++].id;
invariant(RecordId::min() < loc);
invariant(loc < RecordId::max());
BsonRecord bsonRecord = {loc, &(*it)};
bsonRecords.push_back(bsonRecord);
}
int64_t keysInserted;
status = _indexCatalog.indexRecords(opCtx, bsonRecords, &keysInserted);
if (opDebug) {
opDebug->keysInserted += keysInserted;
}
return status;
}
void CollectionImpl::notifyCappedWaitersIfNeeded() {
// If there is a notifier object and another thread is waiting on it, then we notify
// waiters of this document insert. Waiters keep a shared_ptr to '_cappedNotifier', so
// there are waiters if this CollectionImpl's shared_ptr is not unique (use_count > 1).
if (_cappedNotifier && !_cappedNotifier.unique())
_cappedNotifier->notifyAll();
}
Status CollectionImpl::aboutToDeleteCapped(OperationContext* opCtx,
const RecordId& loc,
RecordData data) {
/* check if any cursors point to us. if so, advance them. */
_cursorManager.invalidateDocument(opCtx, loc, INVALIDATION_DELETION);
BSONObj doc = data.releaseToBson();
int64_t* const nullKeysDeleted = nullptr;
_indexCatalog.unindexRecord(opCtx, doc, loc, false, nullKeysDeleted);
// We are not capturing and reporting to OpDebug the 'keysDeleted' by unindexRecord(). It is
// questionable whether reporting will add diagnostic value to users and may instead be
// confusing as it depends on our internal capped collection document removal strategy.
// We can consider adding either keysDeleted or a new metric reporting document removal if
// justified by user demand.
return Status::OK();
}
void CollectionImpl::deleteDocument(
OperationContext* opCtx, const RecordId& loc, OpDebug* opDebug, bool fromMigrate, bool noWarn) {
if (isCapped()) {
log() << "failing remove on a capped ns " << _ns;
uasserted(10089, "cannot remove from a capped collection");
return;
}
Snapshotted doc = docFor(opCtx, loc);
auto deleteState =
getGlobalServiceContext()->getOpObserver()->aboutToDelete(opCtx, ns(), doc.value());
/* check if any cursors point to us. if so, advance them. */
_cursorManager.invalidateDocument(opCtx, loc, INVALIDATION_DELETION);
int64_t keysDeleted;
_indexCatalog.unindexRecord(opCtx, doc.value(), loc, noWarn, &keysDeleted);
if (opDebug) {
opDebug->keysDeleted += keysDeleted;
}
_recordStore->deleteRecord(opCtx, loc);
getGlobalServiceContext()->getOpObserver()->onDelete(
opCtx, ns(), uuid(), std::move(deleteState), fromMigrate);
}
Counter64 moveCounter;
ServerStatusMetricField moveCounterDisplay("record.moves", &moveCounter);
StatusWith CollectionImpl::updateDocument(OperationContext* opCtx,
const RecordId& oldLocation,
const Snapshotted& oldDoc,
const BSONObj& newDoc,
bool enforceQuota,
bool indexesAffected,
OpDebug* opDebug,
OplogUpdateEntryArgs* args) {
{
auto status = checkValidation(opCtx, newDoc);
if (!status.isOK()) {
if (_validationLevel == ValidationLevel::STRICT_V) {
return status;
}
// moderate means we have to check the old doc
auto oldDocStatus = checkValidation(opCtx, oldDoc.value());
if (oldDocStatus.isOK()) {
// transitioning from good -> bad is not ok
return status;
}
// bad -> bad is ok in moderate mode
}
}
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
invariant(oldDoc.snapshotId() == opCtx->recoveryUnit()->getSnapshotId());
invariant(newDoc.isOwned());
if (_needCappedLock) {
// X-lock the metadata resource for this capped collection until the end of the WUOW. This
// prevents the primary from executing with more concurrency than secondaries.
// See SERVER-21646.
Lock::ResourceLock heldUntilEndOfWUOW{
opCtx->lockState(), ResourceId(RESOURCE_METADATA, _ns.ns()), MODE_X};
}
SnapshotId sid = opCtx->recoveryUnit()->getSnapshotId();
BSONElement oldId = oldDoc.value()["_id"];
if (!oldId.eoo() && SimpleBSONElementComparator::kInstance.evaluate(oldId != newDoc["_id"]))
return StatusWith(
ErrorCodes::InternalError, "in Collection::updateDocument _id mismatch", 13596);
// The MMAPv1 storage engine implements capped collections in a way that does not allow records
// to grow beyond their original size. If MMAPv1 part of a replicaset with storage engines that
// do not have this limitation, replication could result in errors, so it is necessary to set a
// uniform rule here. Similarly, it is not sufficient to disallow growing records, because this
// happens when secondaries roll back an update shrunk a record. Exactly replicating legacy
// MMAPv1 behavior would require padding shrunk documents on all storage engines. Instead forbid
// all size changes.
const auto oldSize = oldDoc.value().objsize();
if (_recordStore->isCapped() && oldSize != newDoc.objsize())
return {ErrorCodes::CannotGrowDocumentInCappedNamespace,
str::stream() << "Cannot change the size of a document in a capped collection: "
<< oldSize
<< " != "
<< newDoc.objsize()};
// At the end of this step, we will have a map of UpdateTickets, one per index, which
// represent the index updates needed to be done, based on the changes between oldDoc and
// newDoc.
OwnedPointerMap updateTickets;
if (indexesAffected) {
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(opCtx, true);
while (ii.more()) {
IndexDescriptor* descriptor = ii.next();
IndexCatalogEntry* entry = ii.catalogEntry(descriptor);
IndexAccessMethod* iam = ii.accessMethod(descriptor);
InsertDeleteOptions options;
IndexCatalog::prepareInsertDeleteOptions(opCtx, descriptor, &options);
UpdateTicket* updateTicket = new UpdateTicket();
updateTickets.mutableMap()[descriptor] = updateTicket;
Status ret = iam->validateUpdate(opCtx,
oldDoc.value(),
newDoc,
oldLocation,
options,
updateTicket,
entry->getFilterExpression());
if (!ret.isOK()) {
return StatusWith(ret);
}
}
}
Status updateStatus = _recordStore->updateRecord(
opCtx, oldLocation, newDoc.objdata(), newDoc.objsize(), _enforceQuota(enforceQuota), this);
if (updateStatus == ErrorCodes::NeedsDocumentMove) {
return _updateDocumentWithMove(
opCtx, oldLocation, oldDoc, newDoc, enforceQuota, opDebug, args, sid);
} else if (!updateStatus.isOK()) {
return updateStatus;
}
// Object did not move. We update each index with each respective UpdateTicket.
if (indexesAffected) {
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(opCtx, true);
while (ii.more()) {
IndexDescriptor* descriptor = ii.next();
IndexAccessMethod* iam = ii.accessMethod(descriptor);
int64_t keysInserted;
int64_t keysDeleted;
Status ret = iam->update(
opCtx, *updateTickets.mutableMap()[descriptor], &keysInserted, &keysDeleted);
if (!ret.isOK())
return StatusWith(ret);
if (opDebug) {
opDebug->keysInserted += keysInserted;
opDebug->keysDeleted += keysDeleted;
}
}
}
invariant(sid == opCtx->recoveryUnit()->getSnapshotId());
args->updatedDoc = newDoc;
getGlobalServiceContext()->getOpObserver()->onUpdate(opCtx, *args);
return {oldLocation};
}
StatusWith CollectionImpl::_updateDocumentWithMove(OperationContext* opCtx,
const RecordId& oldLocation,
const Snapshotted& oldDoc,
const BSONObj& newDoc,
bool enforceQuota,
OpDebug* opDebug,
OplogUpdateEntryArgs* args,
const SnapshotId& sid) {
// Insert new record.
StatusWith newLocation = _recordStore->insertRecord(
opCtx, newDoc.objdata(), newDoc.objsize(), _enforceQuota(enforceQuota));
if (!newLocation.isOK()) {
return newLocation;
}
invariant(newLocation.getValue() != oldLocation);
_cursorManager.invalidateDocument(opCtx, oldLocation, INVALIDATION_DELETION);
// Remove indexes for old record.
int64_t keysDeleted;
_indexCatalog.unindexRecord(opCtx, oldDoc.value(), oldLocation, true, &keysDeleted);
// Remove old record.
_recordStore->deleteRecord(opCtx, oldLocation);
std::vector bsonRecords;
BsonRecord bsonRecord = {newLocation.getValue(), &newDoc};
bsonRecords.push_back(bsonRecord);
// Add indexes for new record.
int64_t keysInserted;
Status status = _indexCatalog.indexRecords(opCtx, bsonRecords, &keysInserted);
if (!status.isOK()) {
return StatusWith(status);
}
invariant(sid == opCtx->recoveryUnit()->getSnapshotId());
args->updatedDoc = newDoc;
getGlobalServiceContext()->getOpObserver()->onUpdate(opCtx, *args);
moveCounter.increment();
if (opDebug) {
opDebug->nmoved++;
opDebug->keysInserted += keysInserted;
opDebug->keysDeleted += keysDeleted;
}
return newLocation;
}
Status CollectionImpl::recordStoreGoingToUpdateInPlace(OperationContext* opCtx,
const RecordId& loc) {
// Broadcast the mutation so that query results stay correct.
_cursorManager.invalidateDocument(opCtx, loc, INVALIDATION_MUTATION);
return Status::OK();
}
bool CollectionImpl::updateWithDamagesSupported() const {
if (_validator)
return false;
return _recordStore->updateWithDamagesSupported();
}
StatusWith CollectionImpl::updateDocumentWithDamages(
OperationContext* opCtx,
const RecordId& loc,
const Snapshotted& oldRec,
const char* damageSource,
const mutablebson::DamageVector& damages,
OplogUpdateEntryArgs* args) {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX));
invariant(oldRec.snapshotId() == opCtx->recoveryUnit()->getSnapshotId());
invariant(updateWithDamagesSupported());
// Broadcast the mutation so that query results stay correct.
_cursorManager.invalidateDocument(opCtx, loc, INVALIDATION_MUTATION);
auto newRecStatus =
_recordStore->updateWithDamages(opCtx, loc, oldRec.value(), damageSource, damages);
if (newRecStatus.isOK()) {
args->updatedDoc = newRecStatus.getValue().toBson();
getGlobalServiceContext()->getOpObserver()->onUpdate(opCtx, *args);
}
return newRecStatus;
}
bool CollectionImpl::_enforceQuota(bool userEnforeQuota) const {
if (!userEnforeQuota)
return false;
if (!mmapv1GlobalOptions.quota)
return false;
if (_ns.db() == "local")
return false;
if (_ns.isSpecial())
return false;
return true;
}
bool CollectionImpl::isCapped() const {
return _cappedNotifier.get();
}
std::shared_ptr CollectionImpl::getCappedInsertNotifier() const {
invariant(isCapped());
return _cappedNotifier;
}
uint64_t CollectionImpl::numRecords(OperationContext* opCtx) const {
return _recordStore->numRecords(opCtx);
}
uint64_t CollectionImpl::dataSize(OperationContext* opCtx) const {
return _recordStore->dataSize(opCtx);
}
uint64_t CollectionImpl::getIndexSize(OperationContext* opCtx, BSONObjBuilder* details, int scale) {
IndexCatalog* idxCatalog = getIndexCatalog();
IndexCatalog::IndexIterator ii = idxCatalog->getIndexIterator(opCtx, true);
uint64_t totalSize = 0;
while (ii.more()) {
IndexDescriptor* d = ii.next();
IndexAccessMethod* iam = idxCatalog->getIndex(d);
long long ds = iam->getSpaceUsedBytes(opCtx);
totalSize += ds;
if (details) {
details->appendNumber(d->indexName(), ds / scale);
}
}
return totalSize;
}
/**
* order will be:
* 1) store index specs
* 2) drop indexes
* 3) truncate record store
* 4) re-write indexes
*/
Status CollectionImpl::truncate(OperationContext* opCtx) {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X));
BackgroundOperation::assertNoBgOpInProgForNs(ns());
invariant(_indexCatalog.numIndexesInProgress(opCtx) == 0);
// 1) store index specs
vector indexSpecs;
{
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(opCtx, false);
while (ii.more()) {
const IndexDescriptor* idx = ii.next();
indexSpecs.push_back(idx->infoObj().getOwned());
}
}
// 2) drop indexes
_indexCatalog.dropAllIndexes(opCtx, true);
_cursorManager.invalidateAll(opCtx, false, "collection truncated");
// 3) truncate record store
auto status = _recordStore->truncate(opCtx);
if (!status.isOK())
return status;
// 4) re-create indexes
for (size_t i = 0; i < indexSpecs.size(); i++) {
status = _indexCatalog.createIndexOnEmptyCollection(opCtx, indexSpecs[i]).getStatus();
if (!status.isOK())
return status;
}
return Status::OK();
}
void CollectionImpl::cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X));
invariant(isCapped());
BackgroundOperation::assertNoBgOpInProgForNs(ns());
invariant(_indexCatalog.numIndexesInProgress(opCtx) == 0);
_cursorManager.invalidateAll(opCtx, false, "capped collection truncated");
_recordStore->cappedTruncateAfter(opCtx, end, inclusive);
}
Status CollectionImpl::setValidator(OperationContext* opCtx, BSONObj validatorDoc) {
invariant(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X));
// Make owned early so that the parsed match expression refers to the owned object.
if (!validatorDoc.isOwned())
validatorDoc = validatorDoc.getOwned();
auto statusWithMatcher = parseValidator(validatorDoc);
if (!statusWithMatcher.isOK())
return statusWithMatcher.getStatus();
_details->updateValidator(opCtx, validatorDoc, getValidationLevel(), getValidationAction());
_validator = std::move(statusWithMatcher.getValue());
_validatorDoc = std::move(validatorDoc);
return Status::OK();
}
auto CollectionImpl::parseValidationLevel(StringData newLevel) -> StatusWith {
if (newLevel == "") {
// default
return ValidationLevel::STRICT_V;
} else if (newLevel == "off") {
return ValidationLevel::OFF;
} else if (newLevel == "moderate") {
return ValidationLevel::MODERATE;
} else if (newLevel == "strict") {
return ValidationLevel::STRICT_V;
} else {
return Status(ErrorCodes::BadValue,
str::stream() << "invalid validation level: " << newLevel);
}
}
auto CollectionImpl::parseValidationAction(StringData newAction) -> StatusWith {
if (newAction == "") {
// default
return ValidationAction::ERROR_V;
} else if (newAction == "warn") {
return ValidationAction::WARN;
} else if (newAction == "error") {
return ValidationAction::ERROR_V;
} else {
return Status(ErrorCodes::BadValue,
str::stream() << "invalid validation action: " << newAction);
}
}
StringData CollectionImpl::getValidationLevel() const {
switch (_validationLevel) {
case ValidationLevel::STRICT_V:
return "strict";
case ValidationLevel::OFF:
return "off";
case ValidationLevel::MODERATE:
return "moderate";
}
MONGO_UNREACHABLE;
}
StringData CollectionImpl::getValidationAction() const {
switch (_validationAction) {
case ValidationAction::ERROR_V:
return "error";
case ValidationAction::WARN:
return "warn";
}
MONGO_UNREACHABLE;
}
Status CollectionImpl::setValidationLevel(OperationContext* opCtx, StringData newLevel) {
invariant(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X));
StatusWith status = parseValidationLevel(newLevel);
if (!status.isOK()) {
return status.getStatus();
}
_validationLevel = status.getValue();
_details->updateValidator(opCtx, _validatorDoc, getValidationLevel(), getValidationAction());
return Status::OK();
}
Status CollectionImpl::setValidationAction(OperationContext* opCtx, StringData newAction) {
invariant(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X));
StatusWith status = parseValidationAction(newAction);
if (!status.isOK()) {
return status.getStatus();
}
_validationAction = status.getValue();
_details->updateValidator(opCtx, _validatorDoc, getValidationLevel(), getValidationAction());
return Status::OK();
}
const CollatorInterface* CollectionImpl::getDefaultCollator() const {
return _collator.get();
}
namespace {
static const uint32_t kKeyCountTableSize = 1U << 22;
using IndexKeyCountTable = std::array;
using ValidateResultsMap = std::map;
class RecordStoreValidateAdaptor : public ValidateAdaptor {
public:
RecordStoreValidateAdaptor(OperationContext* opCtx,
ValidateCmdLevel level,
IndexCatalog* ic,
ValidateResultsMap* irm)
: _opCtx(opCtx), _level(level), _indexCatalog(ic), _indexNsResultsMap(irm) {
_ikc = stdx::make_unique();
}
virtual Status validate(const RecordId& recordId, const RecordData& record, size_t* dataSize) {
BSONObj recordBson = record.toBson();
const Status status = validateBSON(
recordBson.objdata(), recordBson.objsize(), Validator::enabledBSONVersion());
if (status.isOK()) {
*dataSize = recordBson.objsize();
} else {
return status;
}
if (_level != kValidateFull || !_indexCatalog->haveAnyIndexes()) {
return status;
}
IndexCatalog::IndexIterator i = _indexCatalog->getIndexIterator(_opCtx, false);
while (i.more()) {
const IndexDescriptor* descriptor = i.next();
const string indexNs = descriptor->indexNamespace();
ValidateResults& results = (*_indexNsResultsMap)[indexNs];
if (!results.valid) {
// No point doing additional validation if the index is already invalid.
continue;
}
const IndexAccessMethod* iam = _indexCatalog->getIndex(descriptor);
if (descriptor->isPartial()) {
const IndexCatalogEntry* ice = _indexCatalog->getEntry(descriptor);
if (!ice->getFilterExpression()->matchesBSON(recordBson)) {
continue;
}
}
BSONObjSet documentKeySet = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
// There's no need to compute the prefixes of the indexed fields that cause the
// index to be multikey when validating the index keys.
MultikeyPaths* multikeyPaths = nullptr;
iam->getKeys(recordBson,
IndexAccessMethod::GetKeysMode::kEnforceConstraints,
&documentKeySet,
multikeyPaths);
if (!descriptor->isMultikey(_opCtx) && documentKeySet.size() > 1) {
string msg = str::stream() << "Index " << descriptor->indexName()
<< " is not multi-key but has more than one"
<< " key in document " << recordId;
results.errors.push_back(msg);
results.valid = false;
}
uint32_t indexNsHash;
MurmurHash3_x86_32(indexNs.c_str(), indexNs.size(), 0, &indexNsHash);
for (const auto& key : documentKeySet) {
if (key.objsize() >= IndexKeyMaxSize) {
// Index keys >= 1024 bytes are not indexed.
_longKeys[indexNs]++;
continue;
}
uint32_t indexEntryHash = hashIndexEntry(key, recordId, indexNsHash);
uint64_t& indexEntryCount = (*_ikc)[indexEntryHash];
if (indexEntryCount != 0) {
indexEntryCount--;
dassert(indexEntryCount >= 0);
if (indexEntryCount == 0) {
_indexKeyCountTableNumEntries--;
}
} else {
_hasDocWithoutIndexEntry = true;
results.valid = false;
}
}
}
return status;
}
bool tooManyIndexEntries() const {
return _indexKeyCountTableNumEntries != 0;
}
bool tooFewIndexEntries() const {
return _hasDocWithoutIndexEntry;
}
/**
* Traverse the index to validate the entries and cache index keys for later use.
*/
void traverseIndex(const IndexAccessMethod* iam,
const IndexDescriptor* descriptor,
ValidateResults& results,
long long numKeys) {
auto indexNs = descriptor->indexNamespace();
_keyCounts[indexNs] = numKeys;
uint32_t indexNsHash;
MurmurHash3_x86_32(indexNs.c_str(), indexNs.size(), 0, &indexNsHash);
if (_level == kValidateFull) {
const auto& key = descriptor->keyPattern();
BSONObj prevIndexEntryKey;
bool isFirstEntry = true;
std::unique_ptr cursor = iam->newCursor(_opCtx, true);
// Seeking to BSONObj() is equivalent to seeking to the first entry of an index.
for (auto indexEntry = cursor->seek(BSONObj(), true); indexEntry;
indexEntry = cursor->next()) {
// Ensure that the index entries are in increasing or decreasing order.
if (!isFirstEntry && (indexEntry->key).woCompare(prevIndexEntryKey, key) < 0) {
if (results.valid) {
results.errors.push_back(
"one or more indexes are not in strictly ascending or descending "
"order");
}
results.valid = false;
}
isFirstEntry = false;
prevIndexEntryKey = indexEntry->key;
// Cache the index keys to cross-validate with documents later.
uint32_t keyHash = hashIndexEntry(indexEntry->key, indexEntry->loc, indexNsHash);
if ((*_ikc)[keyHash] == 0) {
_indexKeyCountTableNumEntries++;
}
(*_ikc)[keyHash]++;
}
}
}
void validateIndexKeyCount(IndexDescriptor* idx, int64_t numRecs, ValidateResults& results) {
const string indexNs = idx->indexNamespace();
long long numIndexedKeys = _keyCounts[indexNs];
long long numLongKeys = _longKeys[indexNs];
auto totalKeys = numLongKeys + numIndexedKeys;
bool hasTooFewKeys = false;
bool noErrorOnTooFewKeys = !failIndexKeyTooLong.load() && (_level != kValidateFull);
if (idx->isIdIndex() && totalKeys != numRecs) {
hasTooFewKeys = totalKeys < numRecs ? true : hasTooFewKeys;
string msg = str::stream() << "number of _id index entries (" << numIndexedKeys
<< ") does not match the number of documents in the index ("
<< numRecs - numLongKeys << ")";
if (noErrorOnTooFewKeys && (numIndexedKeys < numRecs)) {
results.warnings.push_back(msg);
} else {
results.errors.push_back(msg);
results.valid = false;
}
}
if (results.valid && !idx->isMultikey(_opCtx) && totalKeys > numRecs) {
string err = str::stream()
<< "index " << idx->indexName() << " is not multi-key, but has more entries ("
<< numIndexedKeys << ") than documents in the index (" << numRecs - numLongKeys
<< ")";
results.errors.push_back(err);
results.valid = false;
}
// Ignore any indexes with a special access method. If an access method name is given, the
// index may be a full text, geo or special index plugin with different semantics.
if (results.valid && !idx->isSparse() && !idx->isPartial() && !idx->isIdIndex() &&
idx->getAccessMethodName() == "" && totalKeys < numRecs) {
hasTooFewKeys = true;
string msg = str::stream() << "index " << idx->indexName()
<< " is not sparse or partial, but has fewer entries ("
<< numIndexedKeys << ") than documents in the index ("
<< numRecs - numLongKeys << ")";
if (noErrorOnTooFewKeys) {
results.warnings.push_back(msg);
} else {
results.errors.push_back(msg);
results.valid = false;
}
}
if ((_level != kValidateFull) && hasTooFewKeys) {
string warning = str::stream()
<< "index " << idx->indexName()
<< " has fewer keys than records. This may be the result of currently or "
"previously running the server with the failIndexKeyTooLong parameter set to "
"false. Please re-run the validate command with {full: true}";
results.warnings.push_back(warning);
}
}
private:
std::map _longKeys;
std::map _keyCounts;
std::unique_ptr _ikc;
uint32_t _indexKeyCountTableNumEntries = 0;
bool _hasDocWithoutIndexEntry = false;
OperationContext* _opCtx; // Not owned.
ValidateCmdLevel _level;
IndexCatalog* _indexCatalog; // Not owned.
ValidateResultsMap* _indexNsResultsMap; // Not owned.
uint32_t hashIndexEntry(const BSONObj& key, const RecordId& loc, uint32_t hash) {
// We're only using KeyString to get something hashable here, so version doesn't matter.
KeyString ks(KeyString::Version::V1, key, Ordering::make(BSONObj()), loc);
MurmurHash3_x86_32(ks.getTypeBits().getBuffer(), ks.getTypeBits().getSize(), hash, &hash);
MurmurHash3_x86_32(ks.getBuffer(), ks.getSize(), hash, &hash);
return hash % kKeyCountTableSize;
}
};
} // namespace
Status CollectionImpl::validate(OperationContext* opCtx,
ValidateCmdLevel level,
ValidateResults* results,
BSONObjBuilder* output) {
dassert(opCtx->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS));
try {
ValidateResultsMap indexNsResultsMap;
auto indexValidator = stdx::make_unique(
opCtx, level, &_indexCatalog, &indexNsResultsMap);
BSONObjBuilder keysPerIndex; // not using subObjStart to be exception safe
IndexCatalog::IndexIterator i = _indexCatalog.getIndexIterator(opCtx, false);
// Validate Indexes.
while (i.more()) {
opCtx->checkForInterrupt();
const IndexDescriptor* descriptor = i.next();
log(LogComponent::kIndex) << "validating index " << descriptor->indexNamespace()
<< endl;
IndexAccessMethod* iam = _indexCatalog.getIndex(descriptor);
ValidateResults curIndexResults;
int64_t numKeys;
iam->validate(opCtx, &numKeys, &curIndexResults);
keysPerIndex.appendNumber(descriptor->indexNamespace(),
static_cast(numKeys));
if (curIndexResults.valid) {
indexValidator->traverseIndex(iam, descriptor, curIndexResults, numKeys);
} else {
results->valid = false;
}
indexNsResultsMap[descriptor->indexNamespace()] = curIndexResults;
}
// Validate RecordStore and, if `level == kValidateFull`, cross validate indexes and
// RecordStore.
if (results->valid) {
auto status =
_recordStore->validate(opCtx, level, indexValidator.get(), results, output);
// RecordStore::validate always returns Status::OK(). Errors are reported through
// `results`.
dassert(status.isOK());
string msg = "One or more indexes contain invalid index entries.";
// when there's an index key/document mismatch, both `if` and `else if` statements
// will be true. But if we only check tooFewIndexEntries(), we'll be able to see
// which specific index is invalid.
if (indexValidator->tooFewIndexEntries()) {
// The error message can't be more specific because even though the index is
// invalid, we won't know if the corruption occurred on the index entry or in
// the document.
results->errors.push_back(msg);
results->valid = false;
} else if (indexValidator->tooManyIndexEntries()) {
for (auto& it : indexNsResultsMap) {
// Marking all indexes as invalid since we don't know which one failed.
ValidateResults& r = it.second;
r.valid = false;
}
string msg = "One or more indexes contain invalid index entries.";
results->errors.push_back(msg);
results->valid = false;
}
}
// Validate index key count.
if (results->valid) {
IndexCatalog::IndexIterator i = _indexCatalog.getIndexIterator(opCtx, false);
while (i.more()) {
IndexDescriptor* descriptor = i.next();
ValidateResults& curIndexResults = indexNsResultsMap[descriptor->indexNamespace()];
if (curIndexResults.valid) {
indexValidator->validateIndexKeyCount(
descriptor, _recordStore->numRecords(opCtx), curIndexResults);
}
}
}
std::unique_ptr indexDetails;
if (level == kValidateFull)
indexDetails = stdx::make_unique();
// Report index validation results.
for (const auto& it : indexNsResultsMap) {
const std::string indexNs = it.first;
const ValidateResults& vr = it.second;
if (!vr.valid) {
results->valid = false;
}
if (indexDetails.get()) {
BSONObjBuilder bob(indexDetails->subobjStart(indexNs));
bob.appendBool("valid", vr.valid);
if (!vr.warnings.empty()) {
bob.append("warnings", vr.warnings);
}
if (!vr.errors.empty()) {
bob.append("errors", vr.errors);
}
}
results->warnings.insert(
results->warnings.end(), vr.warnings.begin(), vr.warnings.end());
results->errors.insert(results->errors.end(), vr.errors.begin(), vr.errors.end());
}
output->append("nIndexes", _indexCatalog.numIndexesReady(opCtx));
output->append("keysPerIndex", keysPerIndex.done());
if (indexDetails.get()) {
output->append("indexDetails", indexDetails->done());
}
} catch (DBException& e) {
if (ErrorCodes::isInterruption(ErrorCodes::Error(e.getCode()))) {
return e.toStatus();
}
string err = str::stream() << "exception during index validation: " << e.toString();
results->errors.push_back(err);
results->valid = false;
}
return Status::OK();
}
Status CollectionImpl::touch(OperationContext* opCtx,
bool touchData,
bool touchIndexes,
BSONObjBuilder* output) const {
if (touchData) {
BSONObjBuilder b;
Status status = _recordStore->touch(opCtx, &b);
if (!status.isOK())
return status;
output->append("data", b.obj());
}
if (touchIndexes) {
Timer t;
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(opCtx, false);
while (ii.more()) {
const IndexDescriptor* desc = ii.next();
const IndexAccessMethod* iam = _indexCatalog.getIndex(desc);
Status status = iam->touch(opCtx);
if (!status.isOK())
return status;
}
output->append(
"indexes",
BSON("num" << _indexCatalog.numIndexesTotal(opCtx) << "millis" << t.millis()));
}
return Status::OK();
}
} // namespace mongo