// collection.cpp
/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/catalog/collection.h"
#include "mongo/base/counter.h"
#include "mongo/base/owned_pointer_map.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/curop.h"
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/concurrency/lock_mgr.h"
#include "mongo/db/catalog/index_create.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/structure/record_store_v1_capped.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/auth/user_document_parser.h" // XXX-ANDY
namespace mongo {
std::string CompactOptions::toString() const {
std::stringstream ss;
ss << "paddingMode: ";
switch ( paddingMode ) {
case NONE:
ss << "NONE";
break;
case PRESERVE:
ss << "PRESERVE";
break;
case MANUAL:
ss << "MANUAL (" << paddingBytes << " + ( doc * " << paddingFactor <<") )";
}
ss << " validateDocuments: " << validateDocuments;
return ss.str();
}
// ----
Collection::Collection( OperationContext* txn,
const StringData& fullNS,
CollectionCatalogEntry* details,
RecordStore* recordStore,
Database* database )
: _ns( fullNS ),
_details( details ),
_recordStore( recordStore ),
_database( database ),
_infoCache( this ),
_indexCatalog( this ),
_cursorCache( fullNS ) {
_magic = 1357924;
_indexCatalog.init(txn);
if ( isCapped() )
_recordStore->setCappedDeleteCallback( this );
}
Collection::~Collection() {
verify( ok() );
_magic = 0;
}
bool Collection::requiresIdIndex() const {
if ( _ns.ns().find( '$' ) != string::npos ) {
// no indexes on indexes
return false;
}
if ( _ns == _database->_namespacesName ||
_ns == _database->_indexesName ||
_ns == _database->_profileName ) {
return false;
}
if ( _ns.db() == "local" ) {
if ( _ns.coll().startsWith( "oplog." ) )
return false;
}
if ( !_ns.isSystem() ) {
// non system collections definitely have an _id index
return true;
}
return true;
}
RecordIterator* Collection::getIterator( const DiskLoc& start, bool tailable,
const CollectionScanParams::Direction& dir) const {
invariant( ok() );
return _recordStore->getIterator( start, tailable, dir );
}
vector Collection::getManyIterators() const {
return _recordStore->getManyIterators();
}
int64_t Collection::countTableScan( const MatchExpression* expression ) {
scoped_ptr iterator( getIterator( DiskLoc(),
false,
CollectionScanParams::FORWARD ) );
int64_t count = 0;
while ( !iterator->isEOF() ) {
DiskLoc loc = iterator->getNext();
BSONObj obj = docFor( loc );
if ( expression->matchesBSON( obj ) )
count++;
}
return count;
}
BSONObj Collection::docFor(const DiskLoc& loc) const {
return _recordStore->dataFor( loc ).toBson();
}
StatusWith Collection::insertDocument( OperationContext* txn,
const DocWriter* doc,
bool enforceQuota ) {
verify( _indexCatalog.numIndexesTotal() == 0 ); // eventually can implement, just not done
StatusWith loc = _recordStore->insertRecord( txn,
doc,
enforceQuota
? largestFileNumberInQuota()
: 0 );
if ( !loc.isOK() )
return loc;
return StatusWith( loc );
}
StatusWith Collection::insertDocument( OperationContext* txn,
const BSONObj& docToInsert,
bool enforceQuota ) {
if ( _indexCatalog.findIdIndex() ) {
if ( docToInsert["_id"].eoo() ) {
return StatusWith( ErrorCodes::InternalError,
str::stream() << "Collection::insertDocument got "
"document without _id for ns:" << _ns.ns() );
}
}
if ( isCapped() ) {
// TOOD: old god not done
Status ret = _indexCatalog.checkNoIndexConflicts( docToInsert );
if ( !ret.isOK() )
return StatusWith( ret );
}
return _insertDocument( txn, docToInsert, enforceQuota );
}
StatusWith Collection::insertDocument( OperationContext* txn,
const BSONObj& doc,
MultiIndexBlock& indexBlock ) {
StatusWith loc = _recordStore->insertRecord( txn,
doc.objdata(),
doc.objsize(),
0 );
if ( !loc.isOK() )
return loc;
InsertDeleteOptions indexOptions;
indexOptions.logIfError = false;
indexOptions.dupsAllowed = true; // in repair we should be doing no checking
Status status = indexBlock.insert( doc, loc.getValue(), indexOptions );
if ( !status.isOK() )
return StatusWith( status );
return loc;
}
StatusWith Collection::_insertDocument( OperationContext* txn,
const BSONObj& docToInsert,
bool enforceQuota ) {
// TODO: for now, capped logic lives inside NamespaceDetails, which is hidden
// under the RecordStore, this feels broken since that should be a
// collection access method probably
StatusWith loc = _recordStore->insertRecord( txn,
docToInsert.objdata(),
docToInsert.objsize(),
enforceQuota ? largestFileNumberInQuota() : 0 );
if ( !loc.isOK() )
return loc;
_infoCache.notifyOfWriteOp();
try {
_indexCatalog.indexRecord(txn, docToInsert, loc.getValue());
}
catch ( AssertionException& e ) {
if ( isCapped() ) {
return StatusWith( ErrorCodes::InternalError,
str::stream() << "unexpected index insertion failure on"
<< " capped collection" << e.toString()
<< " - collection and its index will not match" );
}
// indexRecord takes care of rolling back indexes
// so we just have to delete the main storage
_recordStore->deleteRecord( txn, loc.getValue() );
return StatusWith( e.toStatus( "insertDocument" ) );
}
return loc;
}
Status Collection::aboutToDeleteCapped( OperationContext* txn, const DiskLoc& loc ) {
BSONObj doc = docFor( loc );
/* check if any cursors point to us. if so, advance them. */
_cursorCache.invalidateDocument(loc, INVALIDATION_DELETION);
_indexCatalog.unindexRecord(txn, doc, loc, false);
return Status::OK();
}
void Collection::deleteDocument( OperationContext* txn,
const DiskLoc& loc,
bool cappedOK,
bool noWarn,
BSONObj* deletedId ) {
if ( isCapped() && !cappedOK ) {
log() << "failing remove on a capped ns " << _ns << endl;
uasserted( 10089, "cannot remove from a capped collection" );
return;
}
BSONObj doc = docFor( loc );
if ( deletedId ) {
BSONElement e = doc["_id"];
if ( e.type() ) {
*deletedId = e.wrap();
}
}
/* check if any cursors point to us. if so, advance them. */
_cursorCache.invalidateDocument(loc, INVALIDATION_DELETION);
_indexCatalog.unindexRecord(txn, doc, loc, noWarn);
_recordStore->deleteRecord( txn, loc );
_infoCache.notifyOfWriteOp();
}
Counter64 moveCounter;
ServerStatusMetricField moveCounterDisplay( "record.moves", &moveCounter );
StatusWith Collection::updateDocument( OperationContext* txn,
const DiskLoc& oldLocation,
const BSONObj& objNew,
bool enforceQuota,
OpDebug* debug ) {
BSONObj objOld = _recordStore->dataFor( oldLocation ).toBson();
if ( objOld.hasElement( "_id" ) ) {
BSONElement oldId = objOld["_id"];
BSONElement newId = objNew["_id"];
if ( oldId != newId )
return StatusWith( ErrorCodes::InternalError,
"in Collection::updateDocument _id mismatch",
13596 );
}
if ( ns().coll() == "system.users" ) {
// XXX - andy and spencer think this should go away now
V2UserDocumentParser parser;
Status s = parser.checkValidUserDocument(objNew);
if ( !s.isOK() )
return StatusWith( s );
}
/* duplicate key check. we descend the btree twice - once for this check, and once for the actual inserts, further
below. that is suboptimal, but it's pretty complicated to do it the other way without rollbacks...
*/
OwnedPointerMap updateTickets;
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( true );
while ( ii.more() ) {
IndexDescriptor* descriptor = ii.next();
IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor );
InsertDeleteOptions options;
options.logIfError = false;
options.dupsAllowed =
!(KeyPattern::isIdKeyPattern(descriptor->keyPattern()) || descriptor->unique())
|| repl::getGlobalReplicationCoordinator()->shouldIgnoreUniqueIndex(descriptor);
UpdateTicket* updateTicket = new UpdateTicket();
updateTickets.mutableMap()[descriptor] = updateTicket;
Status ret = iam->validateUpdate(objOld, objNew, oldLocation, options, updateTicket );
if ( !ret.isOK() ) {
return StatusWith( ret );
}
}
// this can callback into Collection::recordStoreGoingToMove
StatusWith newLocation = _recordStore->updateRecord( txn,
oldLocation,
objNew.objdata(),
objNew.objsize(),
enforceQuota ? largestFileNumberInQuota() : 0,
this );
if ( !newLocation.isOK() ) {
return newLocation;
}
_infoCache.notifyOfWriteOp();
if ( newLocation.getValue() != oldLocation ) {
if ( debug ) {
if (debug->nmoved == -1) // default of -1 rather than 0
debug->nmoved = 1;
else
debug->nmoved += 1;
}
_indexCatalog.indexRecord(txn, objNew, newLocation.getValue());
return newLocation;
}
if ( debug )
debug->keyUpdates = 0;
ii = _indexCatalog.getIndexIterator( true );
while ( ii.more() ) {
IndexDescriptor* descriptor = ii.next();
IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor );
int64_t updatedKeys;
Status ret = iam->update(txn, *updateTickets.mutableMap()[descriptor], &updatedKeys);
if ( !ret.isOK() )
return StatusWith( ret );
if ( debug )
debug->keyUpdates += updatedKeys;
}
// Broadcast the mutation so that query results stay correct.
_cursorCache.invalidateDocument(oldLocation, INVALIDATION_MUTATION);
return newLocation;
}
Status Collection::recordStoreGoingToMove( OperationContext* txn,
const DiskLoc& oldLocation,
const char* oldBuffer,
size_t oldSize ) {
moveCounter.increment();
_cursorCache.invalidateDocument(oldLocation, INVALIDATION_DELETION);
_indexCatalog.unindexRecord(txn, BSONObj(oldBuffer), oldLocation, true);
return Status::OK();
}
Status Collection::updateDocumentWithDamages( OperationContext* txn,
const DiskLoc& loc,
const char* damangeSource,
const mutablebson::DamageVector& damages ) {
// Broadcast the mutation so that query results stay correct.
_cursorCache.invalidateDocument(loc, INVALIDATION_MUTATION);
ExclusiveResourceLock lk(txn->getTransaction(), *(size_t*)&loc);
return _recordStore->updateWithDamages( txn, loc, damangeSource, damages );
}
int Collection::largestFileNumberInQuota() const {
if ( !storageGlobalParams.quota )
return 0;
if ( _ns.db() == "local" )
return 0;
if ( _ns.isSpecial() )
return 0;
return storageGlobalParams.quotaFiles;
}
bool Collection::isCapped() const {
return _recordStore->isCapped();
}
uint64_t Collection::numRecords() const {
return _recordStore->numRecords();
}
uint64_t Collection::dataSize() const {
return _recordStore->dataSize();
}
/**
* order will be:
* 1) store index specs
* 2) drop indexes
* 3) truncate record store
* 4) re-write indexes
*/
Status Collection::truncate(OperationContext* txn) {
massert( 17445, "index build in progress", _indexCatalog.numIndexesInProgress() == 0 );
// 1) store index specs
vector indexSpecs;
{
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( false );
while ( ii.more() ) {
const IndexDescriptor* idx = ii.next();
indexSpecs.push_back( idx->infoObj().getOwned() );
}
}
// 2) drop indexes
Status status = _indexCatalog.dropAllIndexes(txn, true);
if ( !status.isOK() )
return status;
_cursorCache.invalidateAll( false );
_infoCache.reset();
// 3) truncate record store
status = _recordStore->truncate(txn);
if ( !status.isOK() )
return status;
// 4) re-create indexes
for ( size_t i = 0; i < indexSpecs.size(); i++ ) {
status = _indexCatalog.createIndex(txn, indexSpecs[i], false);
if ( !status.isOK() )
return status;
}
return Status::OK();
}
void Collection::temp_cappedTruncateAfter(OperationContext* txn,
DiskLoc end,
bool inclusive) {
invariant( isCapped() );
reinterpret_cast(
_recordStore)->temp_cappedTruncateAfter( txn, end, inclusive );
}
namespace {
class MyValidateAdaptor : public ValidateAdaptor {
public:
virtual ~MyValidateAdaptor(){}
virtual Status validate( const RecordData& record, size_t* dataSize ) {
BSONObj obj = record.toBson();
const Status status = validateBSON(obj.objdata(), obj.objsize());
if ( status.isOK() )
*dataSize = obj.objsize();
return Status::OK();
}
};
}
Status Collection::validate( OperationContext* txn,
bool full, bool scanData,
ValidateResults* results, BSONObjBuilder* output ){
MyValidateAdaptor adaptor;
Status status = _recordStore->validate( txn, full, scanData, &adaptor, results, output );
if ( !status.isOK() )
return status;
{ // indexes
output->append("nIndexes", _indexCatalog.numIndexesReady() );
int idxn = 0;
try {
BSONObjBuilder indexes; // not using subObjStart to be exception safe
IndexCatalog::IndexIterator i = _indexCatalog.getIndexIterator(false);
while( i.more() ) {
const IndexDescriptor* descriptor = i.next();
log() << "validating index " << descriptor->indexNamespace() << endl;
IndexAccessMethod* iam = _indexCatalog.getIndex( descriptor );
invariant( iam );
int64_t keys;
iam->validate(&keys);
indexes.appendNumber(descriptor->indexNamespace(),
static_cast(keys));
idxn++;
}
output->append("keysPerIndex", indexes.done());
}
catch ( DBException& exc ) {
string err = str::stream() <<
"exception during index validate idxn "<<
BSONObjBuilder::numStr(idxn) <<
": " << exc.toString();
results->errors.push_back( err );
results->valid = false;
}
}
return Status::OK();
}
Status Collection::touch( OperationContext* txn,
bool touchData, bool touchIndexes,
BSONObjBuilder* output ) const {
if ( touchData ) {
BSONObjBuilder b;
Status status = _recordStore->touch( txn, &b );
output->append( "data", b.obj() );
if ( !status.isOK() )
return status;
}
if ( touchIndexes ) {
Timer t;
IndexCatalog::IndexIterator ii = _indexCatalog.getIndexIterator( false );
while ( ii.more() ) {
const IndexDescriptor* desc = ii.next();
const IndexAccessMethod* iam = _indexCatalog.getIndex( desc );
Status status = iam->touch( txn );
if ( !status.isOK() )
return status;
}
output->append( "indexes", BSON( "num" << _indexCatalog.numIndexesTotal() <<
"millis" << t.millis() ) );
}
return Status::OK();
}
}