diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/catalog/collection.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/catalog/collection.cpp')
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 1211 |
1 files changed, 588 insertions, 623 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 4be5fe65529..64ef00330d6 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -56,49 +56,45 @@ #include "mongo/db/storage/mmap_v1/mmap_v1_options.h" #include "mongo/db/storage/record_fetcher.h" -#include "mongo/db/auth/user_document_parser.h" // XXX-ANDY +#include "mongo/db/auth/user_document_parser.h" // XXX-ANDY #include "mongo/util/log.h" namespace mongo { namespace { - const auto bannedExpressionsInValidators = std::set<StringData>{ - "$geoNear", - "$near", - "$nearSphere", - "$text", - "$where", - }; - Status checkValidatorForBannedExpressions(const BSONObj& validator) { - for (auto field : validator) { - const auto name = field.fieldNameStringData(); - if (name[0] == '$' && bannedExpressionsInValidators.count(name)) { - return {ErrorCodes::InvalidOptions, - str::stream() << name << " is not allowed in collection validators"}; - } - - if (field.type() == Object || field.type() == Array) { - auto status = checkValidatorForBannedExpressions(field.Obj()); - if (!status.isOK()) - return status; - } +const auto bannedExpressionsInValidators = std::set<StringData>{ + "$geoNear", "$near", "$nearSphere", "$text", "$where", +}; +Status checkValidatorForBannedExpressions(const BSONObj& validator) { + for (auto field : validator) { + const auto name = field.fieldNameStringData(); + if (name[0] == '$' && bannedExpressionsInValidators.count(name)) { + return {ErrorCodes::InvalidOptions, + str::stream() << name << " is not allowed in collection validators"}; } - return Status::OK(); + if (field.type() == Object || field.type() == Array) { + auto status = checkValidatorForBannedExpressions(field.Obj()); + if (!status.isOK()) + return status; + } } + + return Status::OK(); +} } - using std::unique_ptr; - using std::endl; - using std::string; - using std::vector; +using std::unique_ptr; +using std::endl; +using std::string; +using std::vector; - using logger::LogComponent; +using logger::LogComponent; - std::string CompactOptions::toString() const { - std::stringstream ss; - ss << "paddingMode: "; - switch ( paddingMode ) { +std::string CompactOptions::toString() const { + std::stringstream ss; + ss << "paddingMode: "; + switch (paddingMode) { case NONE: ss << "NONE"; break; @@ -106,752 +102,721 @@ namespace { ss << "PRESERVE"; break; case MANUAL: - ss << "MANUAL (" << paddingBytes << " + ( doc * " << paddingFactor <<") )"; - } + ss << "MANUAL (" << paddingBytes << " + ( doc * " << paddingFactor << ") )"; + } - ss << " validateDocuments: " << validateDocuments; + ss << " validateDocuments: " << validateDocuments; - return ss.str(); - } + return ss.str(); +} - // - // CappedInsertNotifier - // +// +// CappedInsertNotifier +// - CappedInsertNotifier::CappedInsertNotifier() - : _cappedInsertCount(0) { - } +CappedInsertNotifier::CappedInsertNotifier() : _cappedInsertCount(0) {} - void CappedInsertNotifier::notifyOfInsert() { - stdx::lock_guard<stdx::mutex> lk(_cappedNewDataMutex); - _cappedInsertCount++; - _cappedNewDataNotifier.notify_all(); - } +void CappedInsertNotifier::notifyOfInsert() { + stdx::lock_guard<stdx::mutex> lk(_cappedNewDataMutex); + _cappedInsertCount++; + _cappedNewDataNotifier.notify_all(); +} - uint64_t CappedInsertNotifier::getCount() const { - stdx::lock_guard<stdx::mutex> lk(_cappedNewDataMutex); - return _cappedInsertCount; - } +uint64_t CappedInsertNotifier::getCount() const { + stdx::lock_guard<stdx::mutex> lk(_cappedNewDataMutex); + return _cappedInsertCount; +} - void CappedInsertNotifier::waitForInsert(uint64_t referenceCount, Microseconds timeout) const { - stdx::unique_lock<stdx::mutex> lk(_cappedNewDataMutex); +void CappedInsertNotifier::waitForInsert(uint64_t referenceCount, Microseconds timeout) const { + stdx::unique_lock<stdx::mutex> lk(_cappedNewDataMutex); - while (referenceCount == _cappedInsertCount) { - if (stdx::cv_status::timeout == _cappedNewDataNotifier.wait_for(lk, timeout)) { - return; - } + while (referenceCount == _cappedInsertCount) { + if (stdx::cv_status::timeout == _cappedNewDataNotifier.wait_for(lk, timeout)) { + return; } } +} - // ---- - - Collection::Collection( OperationContext* txn, - StringData fullNS, - CollectionCatalogEntry* details, - RecordStore* recordStore, - DatabaseCatalogEntry* dbce ) - : _ns( fullNS ), - _details( details ), - _recordStore( recordStore ), - _dbce( dbce ), - _infoCache( this ), - _indexCatalog( this ), - _validatorDoc(_details->getCollectionOptions(txn).validator.getOwned()), - _validator(uassertStatusOK(parseValidator(_validatorDoc))), - _cursorManager(fullNS), - _cappedNotifier(_recordStore->isCapped() ? new CappedInsertNotifier() : nullptr) { - _magic = 1357924; - _indexCatalog.init(txn); - if ( isCapped() ) - _recordStore->setCappedDeleteCallback( this ); - _infoCache.reset(txn); - } +// ---- + +Collection::Collection(OperationContext* txn, + StringData fullNS, + CollectionCatalogEntry* details, + RecordStore* recordStore, + DatabaseCatalogEntry* dbce) + : _ns(fullNS), + _details(details), + _recordStore(recordStore), + _dbce(dbce), + _infoCache(this), + _indexCatalog(this), + _validatorDoc(_details->getCollectionOptions(txn).validator.getOwned()), + _validator(uassertStatusOK(parseValidator(_validatorDoc))), + _cursorManager(fullNS), + _cappedNotifier(_recordStore->isCapped() ? new CappedInsertNotifier() : nullptr) { + _magic = 1357924; + _indexCatalog.init(txn); + if (isCapped()) + _recordStore->setCappedDeleteCallback(this); + _infoCache.reset(txn); +} - Collection::~Collection() { - verify( ok() ); - _magic = 0; - } +Collection::~Collection() { + verify(ok()); + _magic = 0; +} - bool Collection::requiresIdIndex() const { +bool Collection::requiresIdIndex() const { + if (_ns.ns().find('$') != string::npos) { + // no indexes on indexes + return false; + } - if ( _ns.ns().find( '$' ) != string::npos ) { - // no indexes on indexes + if (_ns.isSystem()) { + StringData shortName = _ns.coll().substr(_ns.coll().find('.') + 1); + if (shortName == "indexes" || shortName == "namespaces" || shortName == "profile") { return false; } - - if ( _ns.isSystem() ) { - StringData shortName = _ns.coll().substr( _ns.coll().find( '.' ) + 1 ); - if ( shortName == "indexes" || - shortName == "namespaces" || - shortName == "profile" ) { - return false; - } - } - - if ( _ns.db() == "local" ) { - if ( _ns.coll().startsWith( "oplog." ) ) - return false; - } - - if ( !_ns.isSystem() ) { - // non system collections definitely have an _id index - return true; - } - - - return true; - } - - std::unique_ptr<RecordCursor> Collection::getCursor(OperationContext* txn, bool forward) const { - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); - invariant( ok() ); - - return _recordStore->getCursor(txn, forward); } - vector<std::unique_ptr<RecordCursor>> Collection::getManyCursors(OperationContext* txn) const { - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); - - return _recordStore->getManyCursors(txn); + if (_ns.db() == "local") { + if (_ns.coll().startsWith("oplog.")) + return false; } - Snapshotted<BSONObj> Collection::docFor(OperationContext* txn, const RecordId& loc) const { - return Snapshotted<BSONObj>(txn->recoveryUnit()->getSnapshotId(), - _recordStore->dataFor( txn, loc ).releaseToBson()); + if (!_ns.isSystem()) { + // non system collections definitely have an _id index + return true; } - bool Collection::findDoc(OperationContext* txn, - const RecordId& loc, - Snapshotted<BSONObj>* out) const { - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); - RecordData rd; - if ( !_recordStore->findRecord( txn, loc, &rd ) ) - return false; - *out = Snapshotted<BSONObj>(txn->recoveryUnit()->getSnapshotId(), rd.releaseToBson()); - return true; - } + return true; +} - Status Collection::checkValidation(OperationContext* txn, const BSONObj& document) const { - if (!_validator) - return Status::OK(); +std::unique_ptr<RecordCursor> Collection::getCursor(OperationContext* txn, bool forward) const { + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); + invariant(ok()); - if (documentValidationDisabled(txn)) - return Status::OK(); + return _recordStore->getCursor(txn, forward); +} - if (_validator->matchesBSON(document)) - return Status::OK(); +vector<std::unique_ptr<RecordCursor>> Collection::getManyCursors(OperationContext* txn) const { + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); - return {ErrorCodes::DocumentValidationFailure, "Document failed validation"}; - } + return _recordStore->getManyCursors(txn); +} - StatusWith<std::unique_ptr<MatchExpression>> Collection::parseValidator( - const BSONObj& validator) const { - if (validator.isEmpty()) - return {nullptr}; +Snapshotted<BSONObj> Collection::docFor(OperationContext* txn, const RecordId& loc) const { + return Snapshotted<BSONObj>(txn->recoveryUnit()->getSnapshotId(), + _recordStore->dataFor(txn, loc).releaseToBson()); +} - if (ns().isSystem()) { - return {ErrorCodes::InvalidOptions, - "Document validators not allowed on system collections."}; - } +bool Collection::findDoc(OperationContext* txn, + const RecordId& loc, + Snapshotted<BSONObj>* out) const { + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); - if (ns().isOnInternalDb()) { - return {ErrorCodes::InvalidOptions, - str::stream() << "Document validators are not allowed on collections in" - << " the " << ns().db() << " database"}; - } + RecordData rd; + if (!_recordStore->findRecord(txn, loc, &rd)) + return false; + *out = Snapshotted<BSONObj>(txn->recoveryUnit()->getSnapshotId(), rd.releaseToBson()); + return true; +} - { - auto status = checkValidatorForBannedExpressions(validator); - if (!status.isOK()) - return status; - } +Status Collection::checkValidation(OperationContext* txn, const BSONObj& document) const { + if (!_validator) + return Status::OK(); - auto statusWithRawPtr = MatchExpressionParser::parse(validator); - if (!statusWithRawPtr.isOK()) - return statusWithRawPtr.getStatus(); + if (documentValidationDisabled(txn)) + return Status::OK(); - return {std::unique_ptr<MatchExpression>(statusWithRawPtr.getValue())}; - } + if (_validator->matchesBSON(document)) + return Status::OK(); - StatusWith<RecordId> Collection::insertDocument(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - invariant(!_validator || documentValidationDisabled(txn)); - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - invariant( !_indexCatalog.haveAnyIndexes() ); // eventually can implement, just not done + return {ErrorCodes::DocumentValidationFailure, "Document failed validation"}; +} - StatusWith<RecordId> loc = _recordStore->insertRecord( txn, - doc, - _enforceQuota( enforceQuota ) ); - if ( !loc.isOK() ) - return loc; +StatusWith<std::unique_ptr<MatchExpression>> Collection::parseValidator( + const BSONObj& validator) const { + if (validator.isEmpty()) + return {nullptr}; - // we cannot call into the OpObserver here because the document being written is not present - // fortunately, this is currently only used for adding entries to the oplog. + if (ns().isSystem()) { + return {ErrorCodes::InvalidOptions, + "Document validators not allowed on system collections."}; + } - return StatusWith<RecordId>( loc ); + if (ns().isOnInternalDb()) { + return {ErrorCodes::InvalidOptions, + str::stream() << "Document validators are not allowed on collections in" + << " the " << ns().db() << " database"}; } - StatusWith<RecordId> Collection::insertDocument(OperationContext* txn, - const BSONObj& docToInsert, - bool enforceQuota, - bool fromMigrate) { - { - auto status = checkValidation(txn, docToInsert); - if (!status.isOK()) - return status; - } + { + auto status = checkValidatorForBannedExpressions(validator); + if (!status.isOK()) + return status; + } - const SnapshotId sid = txn->recoveryUnit()->getSnapshotId(); + auto statusWithRawPtr = MatchExpressionParser::parse(validator); + if (!statusWithRawPtr.isOK()) + return statusWithRawPtr.getStatus(); - if ( _indexCatalog.findIdIndex( txn ) ) { - if ( docToInsert["_id"].eoo() ) { - return StatusWith<RecordId>( ErrorCodes::InternalError, - str::stream() << "Collection::insertDocument got " - "document without _id for ns:" << _ns.ns() ); - } - } + return {std::unique_ptr<MatchExpression>(statusWithRawPtr.getValue())}; +} - StatusWith<RecordId> res = _insertDocument( txn, docToInsert, enforceQuota ); - invariant( sid == txn->recoveryUnit()->getSnapshotId() ); - if (res.isOK()) { - getGlobalServiceContext()->getOpObserver()->onInsert(txn, - ns(), - docToInsert, - fromMigrate); - - // If there is a notifier object and another thread is waiting on it, then we notify - // waiters of this document insert. Waiters keep a shared_ptr to '_cappedNotifier', so - // there are waiters if this Collection's shared_ptr is not unique. - if (_cappedNotifier && !_cappedNotifier.unique()) { - _cappedNotifier->notifyOfInsert(); - } - } +StatusWith<RecordId> Collection::insertDocument(OperationContext* txn, + const DocWriter* doc, + bool enforceQuota) { + invariant(!_validator || documentValidationDisabled(txn)); + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); + invariant(!_indexCatalog.haveAnyIndexes()); // eventually can implement, just not done - return res; - } + StatusWith<RecordId> loc = _recordStore->insertRecord(txn, doc, _enforceQuota(enforceQuota)); + if (!loc.isOK()) + return loc; - StatusWith<RecordId> Collection::insertDocument(OperationContext* txn, - const BSONObj& doc, - MultiIndexBlock* indexBlock, - bool enforceQuota) { - { - auto status = checkValidation(txn, doc); - if (!status.isOK()) - return status; - } + // we cannot call into the OpObserver here because the document being written is not present + // fortunately, this is currently only used for adding entries to the oplog. - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); + return StatusWith<RecordId>(loc); +} - StatusWith<RecordId> loc = _recordStore->insertRecord( txn, - doc.objdata(), - doc.objsize(), - _enforceQuota(enforceQuota) ); +StatusWith<RecordId> Collection::insertDocument(OperationContext* txn, + const BSONObj& docToInsert, + bool enforceQuota, + bool fromMigrate) { + { + auto status = checkValidation(txn, docToInsert); + if (!status.isOK()) + return status; + } - if ( !loc.isOK() ) - return loc; + const SnapshotId sid = txn->recoveryUnit()->getSnapshotId(); - Status status = indexBlock->insert( doc, loc.getValue() ); - if ( !status.isOK() ) - return StatusWith<RecordId>( status ); + if (_indexCatalog.findIdIndex(txn)) { + if (docToInsert["_id"].eoo()) { + return StatusWith<RecordId>(ErrorCodes::InternalError, + str::stream() + << "Collection::insertDocument got " + "document without _id for ns:" << _ns.ns()); + } + } - getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), doc); + StatusWith<RecordId> res = _insertDocument(txn, docToInsert, enforceQuota); + invariant(sid == txn->recoveryUnit()->getSnapshotId()); + if (res.isOK()) { + getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), docToInsert, fromMigrate); - // If there is a notifier object and another thread is waiting on it, then we notify waiters - // of this document insert. Waiters keep a shared_ptr to '_cappedNotifier', so there are - // waiters if this Collection's shared_ptr is not unique. + // If there is a notifier object and another thread is waiting on it, then we notify + // waiters of this document insert. Waiters keep a shared_ptr to '_cappedNotifier', so + // there are waiters if this Collection's shared_ptr is not unique. if (_cappedNotifier && !_cappedNotifier.unique()) { _cappedNotifier->notifyOfInsert(); } - - return loc; } - StatusWith<RecordId> Collection::_insertDocument( OperationContext* txn, - const BSONObj& docToInsert, - bool enforceQuota ) { - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); + return res; +} - // TODO: for now, capped logic lives inside NamespaceDetails, which is hidden - // under the RecordStore, this feels broken since that should be a - // collection access method probably +StatusWith<RecordId> Collection::insertDocument(OperationContext* txn, + const BSONObj& doc, + MultiIndexBlock* indexBlock, + bool enforceQuota) { + { + auto status = checkValidation(txn, doc); + if (!status.isOK()) + return status; + } - StatusWith<RecordId> loc = _recordStore->insertRecord( txn, - docToInsert.objdata(), - docToInsert.objsize(), - _enforceQuota( enforceQuota ) ); - if ( !loc.isOK() ) - return loc; + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - invariant( RecordId::min() < loc.getValue() ); - invariant( loc.getValue() < RecordId::max() ); + StatusWith<RecordId> loc = + _recordStore->insertRecord(txn, doc.objdata(), doc.objsize(), _enforceQuota(enforceQuota)); - _infoCache.notifyOfWriteOp(); + if (!loc.isOK()) + return loc; - Status s = _indexCatalog.indexRecord(txn, docToInsert, loc.getValue()); - if (!s.isOK()) - return StatusWith<RecordId>(s); + Status status = indexBlock->insert(doc, loc.getValue()); + if (!status.isOK()) + return StatusWith<RecordId>(status); - return loc; + getGlobalServiceContext()->getOpObserver()->onInsert(txn, ns(), doc); + + // If there is a notifier object and another thread is waiting on it, then we notify waiters + // of this document insert. Waiters keep a shared_ptr to '_cappedNotifier', so there are + // waiters if this Collection's shared_ptr is not unique. + if (_cappedNotifier && !_cappedNotifier.unique()) { + _cappedNotifier->notifyOfInsert(); } - Status Collection::aboutToDeleteCapped( OperationContext* txn, - const RecordId& loc, - RecordData data ) { + return loc; +} - /* check if any cursors point to us. if so, advance them. */ - _cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION); +StatusWith<RecordId> Collection::_insertDocument(OperationContext* txn, + const BSONObj& docToInsert, + bool enforceQuota) { + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - BSONObj doc = data.releaseToBson(); - _indexCatalog.unindexRecord(txn, doc, loc, false); + // TODO: for now, capped logic lives inside NamespaceDetails, which is hidden + // under the RecordStore, this feels broken since that should be a + // collection access method probably - return Status::OK(); - } + StatusWith<RecordId> loc = _recordStore->insertRecord( + txn, docToInsert.objdata(), docToInsert.objsize(), _enforceQuota(enforceQuota)); + if (!loc.isOK()) + return loc; - void Collection::deleteDocument(OperationContext* txn, - const RecordId& loc, - bool cappedOK, - bool noWarn, - BSONObj* deletedId) { - if ( isCapped() && !cappedOK ) { - log() << "failing remove on a capped ns " << _ns << endl; - uasserted( 10089, "cannot remove from a capped collection" ); - return; - } + invariant(RecordId::min() < loc.getValue()); + invariant(loc.getValue() < RecordId::max()); - Snapshotted<BSONObj> doc = docFor(txn, loc); + _infoCache.notifyOfWriteOp(); - BSONElement e = doc.value()["_id"]; - BSONObj id; - if (e.type()) { - id = e.wrap(); - if (deletedId) { - *deletedId = e.wrap(); - } - } + Status s = _indexCatalog.indexRecord(txn, docToInsert, loc.getValue()); + if (!s.isOK()) + return StatusWith<RecordId>(s); - /* check if any cursors point to us. if so, advance them. */ - _cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION); + return loc; +} - _indexCatalog.unindexRecord(txn, doc.value(), loc, noWarn); +Status Collection::aboutToDeleteCapped(OperationContext* txn, + const RecordId& loc, + RecordData data) { + /* check if any cursors point to us. if so, advance them. */ + _cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION); - _recordStore->deleteRecord(txn, loc); + BSONObj doc = data.releaseToBson(); + _indexCatalog.unindexRecord(txn, doc, loc, false); - _infoCache.notifyOfWriteOp(); + return Status::OK(); +} - if (!id.isEmpty()) { - getGlobalServiceContext()->getOpObserver()->onDelete(txn, ns().ns(), id); - } +void Collection::deleteDocument( + OperationContext* txn, const RecordId& loc, bool cappedOK, bool noWarn, BSONObj* deletedId) { + if (isCapped() && !cappedOK) { + log() << "failing remove on a capped ns " << _ns << endl; + uasserted(10089, "cannot remove from a capped collection"); + return; } - Counter64 moveCounter; - ServerStatusMetricField<Counter64> moveCounterDisplay( "record.moves", &moveCounter ); - - StatusWith<RecordId> Collection::updateDocument( OperationContext* txn, - const RecordId& oldLocation, - const Snapshotted<BSONObj>& oldDoc, - const BSONObj& newDoc, - bool enforceQuota, - bool indexesAffected, - OpDebug* debug, - oplogUpdateEntryArgs& args) { - { - auto status = checkValidation(txn, newDoc); - if (!status.isOK()) - return status; - } + Snapshotted<BSONObj> doc = docFor(txn, loc); - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - invariant(oldDoc.snapshotId() == txn->recoveryUnit()->getSnapshotId()); - - SnapshotId sid = txn->recoveryUnit()->getSnapshotId(); - - BSONElement oldId = oldDoc.value()["_id"]; - if ( !oldId.eoo() && ( oldId != newDoc["_id"] ) ) - return StatusWith<RecordId>( ErrorCodes::InternalError, - "in Collection::updateDocument _id mismatch", - 13596 ); - - // At the end of this step, we will have a map of UpdateTickets, one per index, which - // represent the index updates needed to be done, based on the changes between oldDoc and - // newDoc. - OwnedPointerMap<IndexDescriptor*,UpdateTicket> updateTickets; - if ( indexesAffected ) { - IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, true ); - while ( ii.more() ) { - IndexDescriptor* descriptor = ii.next(); - IndexCatalogEntry* entry = ii.catalogEntry(descriptor); - IndexAccessMethod* iam = ii.accessMethod( descriptor ); - - InsertDeleteOptions options; - options.logIfError = false; - options.dupsAllowed = - !(KeyPattern::isIdKeyPattern(descriptor->keyPattern()) || descriptor->unique()) - || repl::getGlobalReplicationCoordinator()->shouldIgnoreUniqueIndex(descriptor); - UpdateTicket* updateTicket = new UpdateTicket(); - updateTickets.mutableMap()[descriptor] = updateTicket; - Status ret = iam->validateUpdate(txn, - oldDoc.value(), - newDoc, - oldLocation, - options, - updateTicket, - entry->getFilterExpression()); - if ( !ret.isOK() ) { - return StatusWith<RecordId>( ret ); - } - } + BSONElement e = doc.value()["_id"]; + BSONObj id; + if (e.type()) { + id = e.wrap(); + if (deletedId) { + *deletedId = e.wrap(); } + } - // This can call back into Collection::recordStoreGoingToMove. If that happens, the old - // object is removed from all indexes. - StatusWith<RecordId> newLocation = _recordStore->updateRecord( txn, - oldLocation, - newDoc.objdata(), - newDoc.objsize(), - _enforceQuota( enforceQuota ), - this ); - - if ( !newLocation.isOK() ) { - return newLocation; - } + /* check if any cursors point to us. if so, advance them. */ + _cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION); - // At this point, the old object may or may not still be indexed, depending on if it was - // moved. + _indexCatalog.unindexRecord(txn, doc.value(), loc, noWarn); - _infoCache.notifyOfWriteOp(); + _recordStore->deleteRecord(txn, loc); - // If the object did move, we need to add the new location to all indexes. - if ( newLocation.getValue() != oldLocation ) { + _infoCache.notifyOfWriteOp(); - if ( debug ) { - if (debug->nmoved == -1) // default of -1 rather than 0 - debug->nmoved = 1; - else - debug->nmoved += 1; - } + if (!id.isEmpty()) { + getGlobalServiceContext()->getOpObserver()->onDelete(txn, ns().ns(), id); + } +} + +Counter64 moveCounter; +ServerStatusMetricField<Counter64> moveCounterDisplay("record.moves", &moveCounter); + +StatusWith<RecordId> Collection::updateDocument(OperationContext* txn, + const RecordId& oldLocation, + const Snapshotted<BSONObj>& oldDoc, + const BSONObj& newDoc, + bool enforceQuota, + bool indexesAffected, + OpDebug* debug, + oplogUpdateEntryArgs& args) { + { + auto status = checkValidation(txn, newDoc); + if (!status.isOK()) + return status; + } + + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); + invariant(oldDoc.snapshotId() == txn->recoveryUnit()->getSnapshotId()); - Status s = _indexCatalog.indexRecord(txn, newDoc, newLocation.getValue()); - if (!s.isOK()) - return StatusWith<RecordId>(s); - invariant( sid == txn->recoveryUnit()->getSnapshotId() ); - args.ns = ns().ns(); - getGlobalServiceContext()->getOpObserver()->onUpdate(txn, args); + SnapshotId sid = txn->recoveryUnit()->getSnapshotId(); - return newLocation; + BSONElement oldId = oldDoc.value()["_id"]; + if (!oldId.eoo() && (oldId != newDoc["_id"])) + return StatusWith<RecordId>( + ErrorCodes::InternalError, "in Collection::updateDocument _id mismatch", 13596); + + // At the end of this step, we will have a map of UpdateTickets, one per index, which + // represent the index updates needed to be done, based on the changes between oldDoc and + // newDoc. + OwnedPointerMap<IndexDescriptor*, UpdateTicket> updateTickets; + if (indexesAffected) { + IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(txn, true); + while (ii.more()) { + IndexDescriptor* descriptor = ii.next(); + IndexCatalogEntry* entry = ii.catalogEntry(descriptor); + IndexAccessMethod* iam = ii.accessMethod(descriptor); + + InsertDeleteOptions options; + options.logIfError = false; + options.dupsAllowed = + !(KeyPattern::isIdKeyPattern(descriptor->keyPattern()) || descriptor->unique()) || + repl::getGlobalReplicationCoordinator()->shouldIgnoreUniqueIndex(descriptor); + UpdateTicket* updateTicket = new UpdateTicket(); + updateTickets.mutableMap()[descriptor] = updateTicket; + Status ret = iam->validateUpdate(txn, + oldDoc.value(), + newDoc, + oldLocation, + options, + updateTicket, + entry->getFilterExpression()); + if (!ret.isOK()) { + return StatusWith<RecordId>(ret); + } } + } - // Object did not move. We update each index with each respective UpdateTicket. + // This can call back into Collection::recordStoreGoingToMove. If that happens, the old + // object is removed from all indexes. + StatusWith<RecordId> newLocation = _recordStore->updateRecord( + txn, oldLocation, newDoc.objdata(), newDoc.objsize(), _enforceQuota(enforceQuota), this); - if ( debug ) - debug->keyUpdates = 0; + if (!newLocation.isOK()) { + return newLocation; + } - if ( indexesAffected ) { - IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, true ); - while ( ii.more() ) { - IndexDescriptor* descriptor = ii.next(); - IndexAccessMethod* iam = ii.accessMethod(descriptor); + // At this point, the old object may or may not still be indexed, depending on if it was + // moved. - int64_t updatedKeys; - Status ret = iam->update( - txn, *updateTickets.mutableMap()[descriptor], &updatedKeys); - if ( !ret.isOK() ) - return StatusWith<RecordId>( ret ); - if ( debug ) - debug->keyUpdates += updatedKeys; - } + _infoCache.notifyOfWriteOp(); + + // If the object did move, we need to add the new location to all indexes. + if (newLocation.getValue() != oldLocation) { + if (debug) { + if (debug->nmoved == -1) // default of -1 rather than 0 + debug->nmoved = 1; + else + debug->nmoved += 1; } - invariant( sid == txn->recoveryUnit()->getSnapshotId() ); + Status s = _indexCatalog.indexRecord(txn, newDoc, newLocation.getValue()); + if (!s.isOK()) + return StatusWith<RecordId>(s); + invariant(sid == txn->recoveryUnit()->getSnapshotId()); args.ns = ns().ns(); getGlobalServiceContext()->getOpObserver()->onUpdate(txn, args); return newLocation; } - Status Collection::recordStoreGoingToMove( OperationContext* txn, - const RecordId& oldLocation, - const char* oldBuffer, - size_t oldSize ) { - moveCounter.increment(); - _cursorManager.invalidateDocument(txn, oldLocation, INVALIDATION_DELETION); - _indexCatalog.unindexRecord(txn, BSONObj(oldBuffer), oldLocation, true); - return Status::OK(); - } + // Object did not move. We update each index with each respective UpdateTicket. - Status Collection::recordStoreGoingToUpdateInPlace( OperationContext* txn, - const RecordId& loc ) { - // Broadcast the mutation so that query results stay correct. - _cursorManager.invalidateDocument(txn, loc, INVALIDATION_MUTATION); - return Status::OK(); + if (debug) + debug->keyUpdates = 0; + + if (indexesAffected) { + IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(txn, true); + while (ii.more()) { + IndexDescriptor* descriptor = ii.next(); + IndexAccessMethod* iam = ii.accessMethod(descriptor); + + int64_t updatedKeys; + Status ret = iam->update(txn, *updateTickets.mutableMap()[descriptor], &updatedKeys); + if (!ret.isOK()) + return StatusWith<RecordId>(ret); + if (debug) + debug->keyUpdates += updatedKeys; + } } + invariant(sid == txn->recoveryUnit()->getSnapshotId()); + args.ns = ns().ns(); + getGlobalServiceContext()->getOpObserver()->onUpdate(txn, args); - bool Collection::updateWithDamagesSupported() const { - if (_validator) - return false; + return newLocation; +} - return _recordStore->updateWithDamagesSupported(); - } +Status Collection::recordStoreGoingToMove(OperationContext* txn, + const RecordId& oldLocation, + const char* oldBuffer, + size_t oldSize) { + moveCounter.increment(); + _cursorManager.invalidateDocument(txn, oldLocation, INVALIDATION_DELETION); + _indexCatalog.unindexRecord(txn, BSONObj(oldBuffer), oldLocation, true); + return Status::OK(); +} - Status Collection::updateDocumentWithDamages( OperationContext* txn, - const RecordId& loc, - const Snapshotted<RecordData>& oldRec, - const char* damageSource, - const mutablebson::DamageVector& damages, - oplogUpdateEntryArgs& args) { - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - invariant(oldRec.snapshotId() == txn->recoveryUnit()->getSnapshotId()); - invariant(updateWithDamagesSupported()); - - // Broadcast the mutation so that query results stay correct. - _cursorManager.invalidateDocument(txn, loc, INVALIDATION_MUTATION); - - Status status = - _recordStore->updateWithDamages(txn, loc, oldRec.value(), damageSource, damages); - - if (status.isOK()) { - args.ns = ns().ns(); - getGlobalServiceContext()->getOpObserver()->onUpdate(txn, args); - } - return status; - } +Status Collection::recordStoreGoingToUpdateInPlace(OperationContext* txn, const RecordId& loc) { + // Broadcast the mutation so that query results stay correct. + _cursorManager.invalidateDocument(txn, loc, INVALIDATION_MUTATION); + return Status::OK(); +} - bool Collection::_enforceQuota( bool userEnforeQuota ) const { - if ( !userEnforeQuota ) - return false; - if ( !mmapv1GlobalOptions.quota ) - return false; +bool Collection::updateWithDamagesSupported() const { + if (_validator) + return false; - if ( _ns.db() == "local" ) - return false; + return _recordStore->updateWithDamagesSupported(); +} - if ( _ns.isSpecial() ) - return false; +Status Collection::updateDocumentWithDamages(OperationContext* txn, + const RecordId& loc, + const Snapshotted<RecordData>& oldRec, + const char* damageSource, + const mutablebson::DamageVector& damages, + oplogUpdateEntryArgs& args) { + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); + invariant(oldRec.snapshotId() == txn->recoveryUnit()->getSnapshotId()); + invariant(updateWithDamagesSupported()); - return true; - } + // Broadcast the mutation so that query results stay correct. + _cursorManager.invalidateDocument(txn, loc, INVALIDATION_MUTATION); - bool Collection::isCapped() const { - return _cappedNotifier.get(); - } + Status status = + _recordStore->updateWithDamages(txn, loc, oldRec.value(), damageSource, damages); - std::shared_ptr<CappedInsertNotifier> Collection::getCappedInsertNotifier() const { - invariant(isCapped()); - return _cappedNotifier; + if (status.isOK()) { + args.ns = ns().ns(); + getGlobalServiceContext()->getOpObserver()->onUpdate(txn, args); } + return status; +} - uint64_t Collection::numRecords( OperationContext* txn ) const { - return _recordStore->numRecords( txn ); - } +bool Collection::_enforceQuota(bool userEnforeQuota) const { + if (!userEnforeQuota) + return false; - uint64_t Collection::dataSize( OperationContext* txn ) const { - return _recordStore->dataSize( txn ); - } + if (!mmapv1GlobalOptions.quota) + return false; + + if (_ns.db() == "local") + return false; - uint64_t Collection::getIndexSize(OperationContext* opCtx, - BSONObjBuilder* details, - int scale) { + if (_ns.isSpecial()) + return false; - IndexCatalog* idxCatalog = getIndexCatalog(); + return true; +} - IndexCatalog::IndexIterator ii = idxCatalog->getIndexIterator(opCtx, true); +bool Collection::isCapped() const { + return _cappedNotifier.get(); +} - uint64_t totalSize = 0; +std::shared_ptr<CappedInsertNotifier> Collection::getCappedInsertNotifier() const { + invariant(isCapped()); + return _cappedNotifier; +} - while (ii.more()) { - IndexDescriptor* d = ii.next(); - IndexAccessMethod* iam = idxCatalog->getIndex(d); +uint64_t Collection::numRecords(OperationContext* txn) const { + return _recordStore->numRecords(txn); +} - long long ds = iam->getSpaceUsedBytes(opCtx); +uint64_t Collection::dataSize(OperationContext* txn) const { + return _recordStore->dataSize(txn); +} - totalSize += ds; - if (details) { - details->appendNumber(d->indexName(), ds / scale); - } - } +uint64_t Collection::getIndexSize(OperationContext* opCtx, BSONObjBuilder* details, int scale) { + IndexCatalog* idxCatalog = getIndexCatalog(); - return totalSize; - } + IndexCatalog::IndexIterator ii = idxCatalog->getIndexIterator(opCtx, true); - /** - * order will be: - * 1) store index specs - * 2) drop indexes - * 3) truncate record store - * 4) re-write indexes - */ - Status Collection::truncate(OperationContext* txn) { - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X)); - massert( 17445, "index build in progress", _indexCatalog.numIndexesInProgress( txn ) == 0 ); - - // 1) store index specs - vector<BSONObj> indexSpecs; - { - IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, false ); - while ( ii.more() ) { - const IndexDescriptor* idx = ii.next(); - indexSpecs.push_back( idx->infoObj().getOwned() ); - } - } + uint64_t totalSize = 0; - // 2) drop indexes - Status status = _indexCatalog.dropAllIndexes(txn, true); - if ( !status.isOK() ) - return status; - _cursorManager.invalidateAll(false, "collection truncated"); - _infoCache.reset( txn ); + while (ii.more()) { + IndexDescriptor* d = ii.next(); + IndexAccessMethod* iam = idxCatalog->getIndex(d); - // 3) truncate record store - status = _recordStore->truncate(txn); - if ( !status.isOK() ) - return status; + long long ds = iam->getSpaceUsedBytes(opCtx); - // 4) re-create indexes - for ( size_t i = 0; i < indexSpecs.size(); i++ ) { - status = _indexCatalog.createIndexOnEmptyCollection(txn, indexSpecs[i]); - if ( !status.isOK() ) - return status; + totalSize += ds; + if (details) { + details->appendNumber(d->indexName(), ds / scale); } + } - return Status::OK(); + return totalSize; +} + +/** + * order will be: + * 1) store index specs + * 2) drop indexes + * 3) truncate record store + * 4) re-write indexes + */ +Status Collection::truncate(OperationContext* txn) { + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X)); + massert(17445, "index build in progress", _indexCatalog.numIndexesInProgress(txn) == 0); + + // 1) store index specs + vector<BSONObj> indexSpecs; + { + IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(txn, false); + while (ii.more()) { + const IndexDescriptor* idx = ii.next(); + indexSpecs.push_back(idx->infoObj().getOwned()); + } } - void Collection::temp_cappedTruncateAfter(OperationContext* txn, - RecordId end, - bool inclusive) { - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - invariant( isCapped() ); + // 2) drop indexes + Status status = _indexCatalog.dropAllIndexes(txn, true); + if (!status.isOK()) + return status; + _cursorManager.invalidateAll(false, "collection truncated"); + _infoCache.reset(txn); - _cursorManager.invalidateAll(false, "capped collection truncated"); - _recordStore->temp_cappedTruncateAfter( txn, end, inclusive ); + // 3) truncate record store + status = _recordStore->truncate(txn); + if (!status.isOK()) + return status; + + // 4) re-create indexes + for (size_t i = 0; i < indexSpecs.size(); i++) { + status = _indexCatalog.createIndexOnEmptyCollection(txn, indexSpecs[i]); + if (!status.isOK()) + return status; } - Status Collection::setValidator(OperationContext* txn, BSONObj validatorDoc) { - invariant(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X)); + return Status::OK(); +} - // Make owned early so that the parsed match expression refers to the owned object. - if (!validatorDoc.isOwned()) validatorDoc = validatorDoc.getOwned(); +void Collection::temp_cappedTruncateAfter(OperationContext* txn, RecordId end, bool inclusive) { + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); + invariant(isCapped()); - auto statusWithMatcher = parseValidator(validatorDoc); - if (!statusWithMatcher.isOK()) - return statusWithMatcher.getStatus(); + _cursorManager.invalidateAll(false, "capped collection truncated"); + _recordStore->temp_cappedTruncateAfter(txn, end, inclusive); +} - _details->updateValidator(txn, validatorDoc); +Status Collection::setValidator(OperationContext* txn, BSONObj validatorDoc) { + invariant(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_X)); - _validator = std::move(statusWithMatcher.getValue()); - _validatorDoc = std::move(validatorDoc); - return Status::OK(); - } + // Make owned early so that the parsed match expression refers to the owned object. + if (!validatorDoc.isOwned()) + validatorDoc = validatorDoc.getOwned(); - namespace { - class MyValidateAdaptor : public ValidateAdaptor { - public: - virtual ~MyValidateAdaptor(){} - - virtual Status validate( const RecordData& record, size_t* dataSize ) { - BSONObj obj = record.toBson(); - const Status status = validateBSON(obj.objdata(), obj.objsize()); - if ( status.isOK() ) - *dataSize = obj.objsize(); - return Status::OK(); - } + auto statusWithMatcher = parseValidator(validatorDoc); + if (!statusWithMatcher.isOK()) + return statusWithMatcher.getStatus(); - }; + _details->updateValidator(txn, validatorDoc); + + _validator = std::move(statusWithMatcher.getValue()); + _validatorDoc = std::move(validatorDoc); + return Status::OK(); +} + +namespace { +class MyValidateAdaptor : public ValidateAdaptor { +public: + virtual ~MyValidateAdaptor() {} + + virtual Status validate(const RecordData& record, size_t* dataSize) { + BSONObj obj = record.toBson(); + const Status status = validateBSON(obj.objdata(), obj.objsize()); + if (status.isOK()) + *dataSize = obj.objsize(); + return Status::OK(); } +}; +} - Status Collection::validate( OperationContext* txn, - bool full, bool scanData, - ValidateResults* results, BSONObjBuilder* output ){ - dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); +Status Collection::validate(OperationContext* txn, + bool full, + bool scanData, + ValidateResults* results, + BSONObjBuilder* output) { + dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IS)); - MyValidateAdaptor adaptor; - Status status = _recordStore->validate( txn, full, scanData, &adaptor, results, output ); - if ( !status.isOK() ) - return status; + MyValidateAdaptor adaptor; + Status status = _recordStore->validate(txn, full, scanData, &adaptor, results, output); + if (!status.isOK()) + return status; - { // indexes - output->append("nIndexes", _indexCatalog.numIndexesReady( txn ) ); - int idxn = 0; - try { - // Only applicable when 'full' validation is requested. - std::unique_ptr<BSONObjBuilder> indexDetails(full ? new BSONObjBuilder() : NULL); - BSONObjBuilder indexes; // not using subObjStart to be exception safe - - IndexCatalog::IndexIterator i = _indexCatalog.getIndexIterator(txn, false); - while( i.more() ) { - const IndexDescriptor* descriptor = i.next(); - log(LogComponent::kIndex) << "validating index " << descriptor->indexNamespace() << endl; - IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor ); - invariant( iam ); - - std::unique_ptr<BSONObjBuilder> bob( - indexDetails.get() ? new BSONObjBuilder( - indexDetails->subobjStart(descriptor->indexNamespace())) : - NULL); - - int64_t keys; - iam->validate(txn, full, &keys, bob.get()); - indexes.appendNumber(descriptor->indexNamespace(), - static_cast<long long>(keys)); - - if (bob) { - BSONObj obj = bob->done(); - BSONElement valid = obj["valid"]; - if (valid.ok() && !valid.trueValue()) { - results->valid = false; - } + { // indexes + output->append("nIndexes", _indexCatalog.numIndexesReady(txn)); + int idxn = 0; + try { + // Only applicable when 'full' validation is requested. + std::unique_ptr<BSONObjBuilder> indexDetails(full ? new BSONObjBuilder() : NULL); + BSONObjBuilder indexes; // not using subObjStart to be exception safe + + IndexCatalog::IndexIterator i = _indexCatalog.getIndexIterator(txn, false); + while (i.more()) { + const IndexDescriptor* descriptor = i.next(); + log(LogComponent::kIndex) << "validating index " << descriptor->indexNamespace() + << endl; + IndexAccessMethod* iam = _indexCatalog.getIndex(descriptor); + invariant(iam); + + std::unique_ptr<BSONObjBuilder> bob( + indexDetails.get() ? new BSONObjBuilder(indexDetails->subobjStart( + descriptor->indexNamespace())) + : NULL); + + int64_t keys; + iam->validate(txn, full, &keys, bob.get()); + indexes.appendNumber(descriptor->indexNamespace(), static_cast<long long>(keys)); + + if (bob) { + BSONObj obj = bob->done(); + BSONElement valid = obj["valid"]; + if (valid.ok() && !valid.trueValue()) { + results->valid = false; } - idxn++; - } - - output->append("keysPerIndex", indexes.done()); - if (indexDetails.get()) { - output->append("indexDetails", indexDetails->done()); } + idxn++; } - catch ( DBException& exc ) { - string err = str::stream() << - "exception during index validate idxn "<< - BSONObjBuilder::numStr(idxn) << - ": " << exc.toString(); - results->errors.push_back( err ); - results->valid = false; + + output->append("keysPerIndex", indexes.done()); + if (indexDetails.get()) { + output->append("indexDetails", indexDetails->done()); } + } catch (DBException& exc) { + string err = str::stream() << "exception during index validate idxn " + << BSONObjBuilder::numStr(idxn) << ": " << exc.toString(); + results->errors.push_back(err); + results->valid = false; } - - return Status::OK(); } - Status Collection::touch( OperationContext* txn, - bool touchData, bool touchIndexes, - BSONObjBuilder* output ) const { - if ( touchData ) { - BSONObjBuilder b; - Status status = _recordStore->touch( txn, &b ); - if ( !status.isOK() ) - return status; - output->append( "data", b.obj() ); - } + return Status::OK(); +} - if ( touchIndexes ) { - Timer t; - IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, false ); - while ( ii.more() ) { - const IndexDescriptor* desc = ii.next(); - const IndexAccessMethod* iam = _indexCatalog.getIndex( desc ); - Status status = iam->touch( txn ); - if ( !status.isOK() ) - return status; - } +Status Collection::touch(OperationContext* txn, + bool touchData, + bool touchIndexes, + BSONObjBuilder* output) const { + if (touchData) { + BSONObjBuilder b; + Status status = _recordStore->touch(txn, &b); + if (!status.isOK()) + return status; + output->append("data", b.obj()); + } - output->append( "indexes", BSON( "num" << _indexCatalog.numIndexesTotal( txn ) << - "millis" << t.millis() ) ); + if (touchIndexes) { + Timer t; + IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator(txn, false); + while (ii.more()) { + const IndexDescriptor* desc = ii.next(); + const IndexAccessMethod* iam = _indexCatalog.getIndex(desc); + Status status = iam->touch(txn); + if (!status.isOK()) + return status; } - return Status::OK(); + output->append("indexes", + BSON("num" << _indexCatalog.numIndexesTotal(txn) << "millis" << t.millis())); } + return Status::OK(); +} } |