// collection.cpp /** * Copyright (C) 2013-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #include "mongo/db/catalog/collection.h" #include #include "mongo/base/counter.h" #include "mongo/base/owned_pointer_map.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/curop.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/index_create.h" #include "mongo/db/index/index_access_method.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/storage/mmap_v1/mmap_v1_options.h" #include "mongo/db/storage/record_fetcher.h" #include "mongo/db/auth/user_document_parser.h" // XXX-ANDY #include "mongo/util/log.h" namespace mongo { using boost::scoped_ptr; using std::endl; using std::string; using std::vector; using logger::LogComponent; std::string CompactOptions::toString() const { std::stringstream ss; ss << "paddingMode: "; switch ( paddingMode ) { case NONE: ss << "NONE"; break; case PRESERVE: ss << "PRESERVE"; break; case MANUAL: ss << "MANUAL (" << paddingBytes << " + ( doc * " << paddingFactor <<") )"; } ss << " validateDocuments: " << validateDocuments; return ss.str(); } // ---- Collection::Collection( OperationContext* txn, const StringData& fullNS, CollectionCatalogEntry* details, RecordStore* recordStore, Database* database ) : _ns( fullNS ), _details( details ), _recordStore( recordStore ), _database( database ), _infoCache( this ), _indexCatalog( this ), _cursorManager( fullNS ) { _magic = 1357924; _indexCatalog.init(txn); if ( isCapped() ) _recordStore->setCappedDeleteCallback( this ); _infoCache.reset(txn); } Collection::~Collection() { verify( ok() ); _magic = 0; } bool Collection::requiresIdIndex() const { if ( _ns.ns().find( '$' ) != string::npos ) { // no indexes on indexes return false; } if ( _ns.isSystem() ) { StringData shortName = _ns.coll().substr( _ns.coll().find( '.' ) + 1 ); if ( shortName == "indexes" || shortName == "namespaces" || shortName == "profile" ) { return false; } } if ( _ns.db() == "local" ) { if ( _ns.coll().startsWith( "oplog." ) ) return false; } if ( !_ns.isSystem() ) { // non system collections definitely have an _id index return true; } return true; } RecordIterator* Collection::getIterator( OperationContext* txn, const RecordId& start, const CollectionScanParams::Direction& dir) const { invariant( ok() ); return _recordStore->getIterator( txn, start, dir ); } vector Collection::getManyIterators( OperationContext* txn ) const { return _recordStore->getManyIterators(txn); } int64_t Collection::countTableScan( OperationContext* txn, const MatchExpression* expression ) { scoped_ptr iterator( getIterator( txn, RecordId(), CollectionScanParams::FORWARD ) ); int64_t count = 0; while ( !iterator->isEOF() ) { RecordId loc = iterator->getNext(); BSONObj obj = docFor( txn, loc ); if ( expression->matchesBSON( obj ) ) count++; } return count; } BSONObj Collection::docFor(OperationContext* txn, const RecordId& loc) const { return _recordStore->dataFor( txn, loc ).releaseToBson(); } bool Collection::findDoc(OperationContext* txn, const RecordId& loc, BSONObj* out) const { RecordData rd; if ( !_recordStore->findRecord( txn, loc, &rd ) ) return false; *out = rd.releaseToBson(); return true; } StatusWith Collection::insertDocument( OperationContext* txn, const DocWriter* doc, bool enforceQuota ) { invariant( !_indexCatalog.haveAnyIndexes() ); // eventually can implement, just not done StatusWith loc = _recordStore->insertRecord( txn, doc, _enforceQuota( enforceQuota ) ); if ( !loc.isOK() ) return loc; return StatusWith( loc ); } StatusWith Collection::insertDocument( OperationContext* txn, const BSONObj& docToInsert, bool enforceQuota ) { uint64_t txnId = txn->recoveryUnit()->getMyTransactionCount(); if ( _indexCatalog.findIdIndex( txn ) ) { if ( docToInsert["_id"].eoo() ) { return StatusWith( ErrorCodes::InternalError, str::stream() << "Collection::insertDocument got " "document without _id for ns:" << _ns.ns() ); } } StatusWith res = _insertDocument( txn, docToInsert, enforceQuota ); invariant( txnId == txn->recoveryUnit()->getMyTransactionCount() ); return res; } StatusWith Collection::insertDocument( OperationContext* txn, const BSONObj& doc, MultiIndexBlock* indexBlock, bool enforceQuota ) { StatusWith loc = _recordStore->insertRecord( txn, doc.objdata(), doc.objsize(), _enforceQuota(enforceQuota) ); if ( !loc.isOK() ) return loc; Status status = indexBlock->insert( doc, loc.getValue() ); if ( !status.isOK() ) return StatusWith( status ); return loc; } RecordFetcher* Collection::documentNeedsFetch( OperationContext* txn, const RecordId& loc ) const { return _recordStore->recordNeedsFetch( txn, loc ); } StatusWith Collection::_insertDocument( OperationContext* txn, const BSONObj& docToInsert, bool enforceQuota ) { // TODO: for now, capped logic lives inside NamespaceDetails, which is hidden // under the RecordStore, this feels broken since that should be a // collection access method probably StatusWith loc = _recordStore->insertRecord( txn, docToInsert.objdata(), docToInsert.objsize(), _enforceQuota( enforceQuota ) ); if ( !loc.isOK() ) return loc; invariant( RecordId::min() < loc.getValue() ); invariant( loc.getValue() < RecordId::max() ); _infoCache.notifyOfWriteOp(); Status s = _indexCatalog.indexRecord(txn, docToInsert, loc.getValue()); if (!s.isOK()) return StatusWith(s); return loc; } Status Collection::aboutToDeleteCapped( OperationContext* txn, const RecordId& loc ) { BSONObj doc = docFor( txn, loc ); /* check if any cursors point to us. if so, advance them. */ _cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION); _indexCatalog.unindexRecord(txn, doc, loc, false); return Status::OK(); } void Collection::deleteDocument( OperationContext* txn, const RecordId& loc, bool cappedOK, bool noWarn, BSONObj* deletedId ) { if ( isCapped() && !cappedOK ) { log() << "failing remove on a capped ns " << _ns << endl; uasserted( 10089, "cannot remove from a capped collection" ); return; } BSONObj doc = docFor( txn, loc ); if ( deletedId ) { BSONElement e = doc["_id"]; if ( e.type() ) { *deletedId = e.wrap(); } } /* check if any cursors point to us. if so, advance them. */ _cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION); _indexCatalog.unindexRecord(txn, doc, loc, noWarn); _recordStore->deleteRecord( txn, loc ); _infoCache.notifyOfWriteOp(); } Counter64 moveCounter; ServerStatusMetricField moveCounterDisplay( "record.moves", &moveCounter ); StatusWith Collection::updateDocument( OperationContext* txn, const RecordId& oldLocation, const BSONObj& objOld, const BSONObj& objNew, bool enforceQuota, bool indexesAffected, OpDebug* debug ) { uint64_t txnId = txn->recoveryUnit()->getMyTransactionCount(); BSONElement oldId = objOld["_id"]; if ( !oldId.eoo() && ( oldId != objNew["_id"] ) ) return StatusWith( ErrorCodes::InternalError, "in Collection::updateDocument _id mismatch", 13596 ); // At the end of this step, we will have a map of UpdateTickets, one per index, which // represent the index updates needed to be done, based on the changes between objOld and // objNew. OwnedPointerMap updateTickets; if ( indexesAffected ) { IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, true ); while ( ii.more() ) { IndexDescriptor* descriptor = ii.next(); IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor ); InsertDeleteOptions options; options.logIfError = false; options.dupsAllowed = !(KeyPattern::isIdKeyPattern(descriptor->keyPattern()) || descriptor->unique()) || repl::getGlobalReplicationCoordinator()->shouldIgnoreUniqueIndex(descriptor); UpdateTicket* updateTicket = new UpdateTicket(); updateTickets.mutableMap()[descriptor] = updateTicket; Status ret = iam->validateUpdate( txn, objOld, objNew, oldLocation, options, updateTicket ); if ( !ret.isOK() ) { return StatusWith( ret ); } } } // This can call back into Collection::recordStoreGoingToMove. If that happens, the old // object is removed from all indexes. StatusWith newLocation = _recordStore->updateRecord( txn, oldLocation, objNew.objdata(), objNew.objsize(), _enforceQuota( enforceQuota ), this ); if ( !newLocation.isOK() ) { return newLocation; } // At this point, the old object may or may not still be indexed, depending on if it was // moved. _infoCache.notifyOfWriteOp(); // If the object did move, we need to add the new location to all indexes. if ( newLocation.getValue() != oldLocation ) { if ( debug ) { if (debug->nmoved == -1) // default of -1 rather than 0 debug->nmoved = 1; else debug->nmoved += 1; } Status s = _indexCatalog.indexRecord(txn, objNew, newLocation.getValue()); if (!s.isOK()) return StatusWith(s); invariant( txnId == txn->recoveryUnit()->getMyTransactionCount() ); return newLocation; } // Object did not move. We update each index with each respective UpdateTicket. if ( debug ) debug->keyUpdates = 0; if ( indexesAffected ) { IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, true ); while ( ii.more() ) { IndexDescriptor* descriptor = ii.next(); IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor ); int64_t updatedKeys; Status ret = iam->update( txn, *updateTickets.mutableMap()[descriptor], &updatedKeys); if ( !ret.isOK() ) return StatusWith( ret ); if ( debug ) debug->keyUpdates += updatedKeys; } } // Broadcast the mutation so that query results stay correct. _cursorManager.invalidateDocument(txn, oldLocation, INVALIDATION_MUTATION); invariant( txnId == txn->recoveryUnit()->getMyTransactionCount() ); return newLocation; } Status Collection::recordStoreGoingToMove( OperationContext* txn, const RecordId& oldLocation, const char* oldBuffer, size_t oldSize ) { moveCounter.increment(); _cursorManager.invalidateDocument(txn, oldLocation, INVALIDATION_DELETION); _indexCatalog.unindexRecord(txn, BSONObj(oldBuffer), oldLocation, true); return Status::OK(); } Status Collection::updateDocumentWithDamages( OperationContext* txn, const RecordId& loc, const RecordData& oldRec, const char* damageSource, const mutablebson::DamageVector& damages ) { // Broadcast the mutation so that query results stay correct. _cursorManager.invalidateDocument(txn, loc, INVALIDATION_MUTATION); return _recordStore->updateWithDamages( txn, loc, oldRec, damageSource, damages ); } bool Collection::_enforceQuota( bool userEnforeQuota ) const { if ( !userEnforeQuota ) return false; if ( !mmapv1GlobalOptions.quota ) return false; if ( _ns.db() == "local" ) return false; if ( _ns.isSpecial() ) return false; return true; } bool Collection::isCapped() const { return _recordStore->isCapped(); } uint64_t Collection::numRecords( OperationContext* txn ) const { return _recordStore->numRecords( txn ); } uint64_t Collection::dataSize( OperationContext* txn ) const { return _recordStore->dataSize( txn ); } uint64_t Collection::getIndexSize(OperationContext* opCtx, BSONObjBuilder* details, int scale) { IndexCatalog* idxCatalog = getIndexCatalog(); IndexCatalog::IndexIterator ii = idxCatalog->getIndexIterator(opCtx, true); uint64_t totalSize = 0; while (ii.more()) { IndexDescriptor* d = ii.next(); IndexAccessMethod* iam = idxCatalog->getIndex(d); long long ds = iam->getSpaceUsedBytes(opCtx); totalSize += ds; if (details) { details->appendNumber(d->indexName(), ds / scale); } } return totalSize; } /** * order will be: * 1) store index specs * 2) drop indexes * 3) truncate record store * 4) re-write indexes */ Status Collection::truncate(OperationContext* txn) { massert( 17445, "index build in progress", _indexCatalog.numIndexesInProgress( txn ) == 0 ); // 1) store index specs vector indexSpecs; { IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, false ); while ( ii.more() ) { const IndexDescriptor* idx = ii.next(); indexSpecs.push_back( idx->infoObj().getOwned() ); } } // 2) drop indexes Status status = _indexCatalog.dropAllIndexes(txn, true); if ( !status.isOK() ) return status; _cursorManager.invalidateAll( false ); _infoCache.reset( txn ); // 3) truncate record store status = _recordStore->truncate(txn); if ( !status.isOK() ) return status; // 4) re-create indexes for ( size_t i = 0; i < indexSpecs.size(); i++ ) { status = _indexCatalog.createIndexOnEmptyCollection(txn, indexSpecs[i]); if ( !status.isOK() ) return status; } return Status::OK(); } void Collection::temp_cappedTruncateAfter(OperationContext* txn, RecordId end, bool inclusive) { invariant( isCapped() ); _recordStore->temp_cappedTruncateAfter( txn, end, inclusive ); } namespace { class MyValidateAdaptor : public ValidateAdaptor { public: virtual ~MyValidateAdaptor(){} virtual Status validate( const RecordData& record, size_t* dataSize ) { BSONObj obj = record.toBson(); const Status status = validateBSON(obj.objdata(), obj.objsize()); if ( status.isOK() ) *dataSize = obj.objsize(); return Status::OK(); } }; } Status Collection::validate( OperationContext* txn, bool full, bool scanData, ValidateResults* results, BSONObjBuilder* output ){ MyValidateAdaptor adaptor; Status status = _recordStore->validate( txn, full, scanData, &adaptor, results, output ); if ( !status.isOK() ) return status; { // indexes output->append("nIndexes", _indexCatalog.numIndexesReady( txn ) ); int idxn = 0; try { // Only applicable when 'full' validation is requested. boost::scoped_ptr indexDetails(full ? new BSONObjBuilder() : NULL); BSONObjBuilder indexes; // not using subObjStart to be exception safe IndexCatalog::IndexIterator i = _indexCatalog.getIndexIterator(txn, false); while( i.more() ) { const IndexDescriptor* descriptor = i.next(); log(LogComponent::kIndex) << "validating index " << descriptor->indexNamespace() << endl; IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor ); invariant( iam ); boost::scoped_ptr bob( indexDetails.get() ? new BSONObjBuilder( indexDetails->subobjStart(descriptor->indexNamespace())) : NULL); int64_t keys; iam->validate(txn, full, &keys, bob.get()); indexes.appendNumber(descriptor->indexNamespace(), static_cast(keys)); idxn++; } output->append("keysPerIndex", indexes.done()); if (indexDetails.get()) { output->append("indexDetails", indexDetails->done()); } } catch ( DBException& exc ) { string err = str::stream() << "exception during index validate idxn "<< BSONObjBuilder::numStr(idxn) << ": " << exc.toString(); results->errors.push_back( err ); results->valid = false; } } return Status::OK(); } Status Collection::touch( OperationContext* txn, bool touchData, bool touchIndexes, BSONObjBuilder* output ) const { if ( touchData ) { BSONObjBuilder b; Status status = _recordStore->touch( txn, &b ); output->append( "data", b.obj() ); if ( !status.isOK() ) return status; } if ( touchIndexes ) { Timer t; IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( txn, false ); while ( ii.more() ) { const IndexDescriptor* desc = ii.next(); const IndexAccessMethod* iam = _indexCatalog.getIndex( desc ); Status status = iam->touch( txn ); if ( !status.isOK() ) return status; } output->append( "indexes", BSON( "num" << _indexCatalog.numIndexesTotal( txn ) << "millis" << t.millis() ) ); } return Status::OK(); } }