/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
#include "mongo/platform/basic.h"
#include "mongo/db/storage/mmap_v1/mmap_v1_database_catalog_entry.h"
#include
#include "mongo/db/catalog/index_catalog_entry.h"
#include "mongo/db/index/2d_access_method.h"
#include "mongo/db/index/btree_access_method.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/fts_access_method.h"
#include "mongo/db/index/hash_access_method.h"
#include "mongo/db/index/haystack_access_method.h"
#include "mongo/db/index/s2_access_method.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/storage/mmap_v1/btree/btree_interface.h"
#include "mongo/db/storage/mmap_v1/catalog/namespace_details.h"
#include "mongo/db/storage/mmap_v1/catalog/namespace_details_collection_entry.h"
#include "mongo/db/storage/mmap_v1/catalog/namespace_details_rsv1_metadata.h"
#include "mongo/db/storage/mmap_v1/data_file.h"
#include "mongo/db/storage/mmap_v1/record_store_v1_capped.h"
#include "mongo/db/storage/mmap_v1/record_store_v1_simple.h"
#include "mongo/util/log.h"
namespace mongo {
using std::unique_ptr;
using std::unique_ptr;
namespace {
/**
* Declaration for the "newCollectionsUsePowerOf2Sizes" server parameter, which is now
* deprecated in 3.0.
* Note that:
* - setting to true performs a no-op.
* - setting to false will fail.
*/
class NewCollectionsUsePowerOf2SizesParameter : public ExportedServerParameter {
public:
NewCollectionsUsePowerOf2SizesParameter() :
ExportedServerParameter(ServerParameterSet::getGlobal(),
"newCollectionsUsePowerOf2Sizes",
&newCollectionsUsePowerOf2SizesFlag,
true,
true),
newCollectionsUsePowerOf2SizesFlag(true) {
}
virtual Status validate(const bool& potentialNewValue) {
if (!potentialNewValue) {
return Status(ErrorCodes::BadValue,
"newCollectionsUsePowerOf2Sizes cannot be set to false. "
"Use noPadding instead during createCollection.");
}
return Status::OK();
}
private:
// Unused, needed for server parameter.
bool newCollectionsUsePowerOf2SizesFlag;
} exportedNewCollectionsUsePowerOf2SizesParameter;
int _massageExtentSize(const ExtentManager* em, long long size) {
if (size < em->minSize())
return em->minSize();
if (size > em->maxSize())
return em->maxSize();
return static_cast(size);
}
} // namespace
/**
* Registers the insertion of a new entry in the _collections cache with the RecoveryUnit,
* allowing for rollback.
*/
class MMAPV1DatabaseCatalogEntry::EntryInsertion : public RecoveryUnit::Change {
public:
EntryInsertion(StringData ns, MMAPV1DatabaseCatalogEntry* entry)
: _ns(ns.toString()), _entry(entry) { }
void rollback() {
_entry->_removeFromCache(NULL, _ns);
}
void commit() { }
private:
const std::string _ns;
MMAPV1DatabaseCatalogEntry* const _entry;
};
/**
* Registers the removal of an entry from the _collections cache with the RecoveryUnit,
* delaying actual deletion of the information until the change is commited. This allows
* for easy rollback.
*/
class MMAPV1DatabaseCatalogEntry::EntryRemoval : public RecoveryUnit::Change {
public:
// Rollback removing the collection from the cache. Takes ownership of the cachedEntry,
// and will delete it if removal is final.
EntryRemoval(StringData ns,
MMAPV1DatabaseCatalogEntry* catalogEntry,
Entry *cachedEntry)
: _ns(ns.toString()), _catalogEntry(catalogEntry), _cachedEntry(cachedEntry) { }
void rollback() {
_catalogEntry->_collections[_ns] = _cachedEntry;
}
void commit() {
delete _cachedEntry;
}
private:
const std::string _ns;
MMAPV1DatabaseCatalogEntry* const _catalogEntry;
Entry* const _cachedEntry;
};
MMAPV1DatabaseCatalogEntry::MMAPV1DatabaseCatalogEntry( OperationContext* txn,
StringData name,
StringData path,
bool directoryPerDB,
bool transient )
: DatabaseCatalogEntry( name ),
_path( path.toString() ),
_namespaceIndex(_path, name.toString()),
_extentManager(name, path, directoryPerDB) {
invariant(txn->lockState()->isDbLockedForMode(name, MODE_X));
try {
// First init the .ns file. If this fails, we may leak the .ns file, but this is OK
// because subsequent openDB will go through this code path again.
_namespaceIndex.init(txn);
// Initialize the extent manager. This will create the first data file (.0) if needed
// and if this fails we would leak the .ns file above. Leaking the .ns or .0 file is
// acceptable, because subsequent openDB calls will exercise the code path again.
Status s = _extentManager.init(txn);
if (!s.isOK()) {
msgasserted(16966, str::stream() << "_extentManager.init failed: " << s.toString());
}
// This is the actual loading of the on-disk structures into cache.
_init( txn );
}
catch (const DBException& dbe) {
warning() << "database " << path << " " << name
<< " could not be opened due to DBException " << dbe.getCode() << ": "
<< dbe.what();
throw;
}
catch (const std::exception& e) {
warning() << "database " << path << " " << name
<< " could not be opened " << e.what();
throw;
}
}
MMAPV1DatabaseCatalogEntry::~MMAPV1DatabaseCatalogEntry() {
for ( CollectionMap::const_iterator i = _collections.begin();
i != _collections.end();
++i ) {
delete i->second;
}
_collections.clear();
}
intmax_t dbSize( const string& database ); // from repair_database.cpp
int64_t MMAPV1DatabaseCatalogEntry::sizeOnDisk( OperationContext* opCtx ) const {
return static_cast( dbSize( name() ) );
}
void MMAPV1DatabaseCatalogEntry::_removeFromCache(RecoveryUnit* ru,
StringData ns) {
CollectionMap::iterator i = _collections.find(ns.toString());
if (i == _collections.end()) {
return;
}
// If there is an operation context, register a rollback to restore the cache entry
if (ru) {
ru->registerChange(new EntryRemoval(ns, this, i->second));
}
else {
delete i->second;
}
_collections.erase(i);
}
Status MMAPV1DatabaseCatalogEntry::dropCollection(OperationContext* txn, StringData ns) {
invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X));
_removeFromCache(txn->recoveryUnit(), ns);
NamespaceDetails* details = _namespaceIndex.details( ns );
if ( !details ) {
return Status( ErrorCodes::NamespaceNotFound, str::stream() << "ns not found: " << ns );
}
invariant( details->nIndexes == 0 ); // TODO: delete instead?
invariant( details->indexBuildsInProgress == 0 ); // TODO: delete instead?
_removeNamespaceFromNamespaceCollection( txn, ns );
// free extents
if( !details->firstExtent.isNull() ) {
_extentManager.freeExtents(txn, details->firstExtent, details->lastExtent);
*txn->recoveryUnit()->writing( &details->firstExtent ) = DiskLoc().setInvalid();
*txn->recoveryUnit()->writing( &details->lastExtent ) = DiskLoc().setInvalid();
}
// remove from the catalog hashtable
_namespaceIndex.kill_ns( txn, ns );
return Status::OK();
}
Status MMAPV1DatabaseCatalogEntry::renameCollection( OperationContext* txn,
StringData fromNS,
StringData toNS,
bool stayTemp ) {
Status s = _renameSingleNamespace( txn, fromNS, toNS, stayTemp );
if ( !s.isOK() )
return s;
NamespaceDetails* details = _namespaceIndex.details( toNS );
invariant( details );
RecordStoreV1Base* systemIndexRecordStore = _getIndexRecordStore();
auto cursor = systemIndexRecordStore->getCursor(txn);
while (auto record = cursor->next()) {
BSONObj oldIndexSpec = record->data.releaseToBson();
if ( fromNS != oldIndexSpec["ns"].valuestrsafe() )
continue;
BSONObj newIndexSpec;
{
BSONObjBuilder b;
BSONObjIterator i( oldIndexSpec );
while( i.more() ) {
BSONElement e = i.next();
if ( strcmp( e.fieldName(), "ns" ) != 0 )
b.append( e );
else
b << "ns" << toNS;
}
newIndexSpec = b.obj();
}
StatusWith newIndexSpecLoc =
systemIndexRecordStore->insertRecord( txn,
newIndexSpec.objdata(),
newIndexSpec.objsize(),
false );
if ( !newIndexSpecLoc.isOK() )
return newIndexSpecLoc.getStatus();
const string& indexName = oldIndexSpec.getStringField( "name" );
{
// fix IndexDetails pointer
NamespaceDetailsCollectionCatalogEntry ce( toNS,
details,
_getNamespaceRecordStore(),
systemIndexRecordStore,
this );
int indexI = ce._findIndexNumber( txn, indexName );
IndexDetails& indexDetails = details->idx(indexI);
*txn->recoveryUnit()->writing(&indexDetails.info) =
DiskLoc::fromRecordId(newIndexSpecLoc.getValue());
}
{
// move underlying namespac
string oldIndexNs = IndexDescriptor::makeIndexNamespace( fromNS, indexName );
string newIndexNs = IndexDescriptor::makeIndexNamespace( toNS, indexName );
Status s = _renameSingleNamespace( txn, oldIndexNs, newIndexNs, false );
if ( !s.isOK() )
return s;
}
systemIndexRecordStore->deleteRecord( txn, record->id );
}
return Status::OK();
}
Status MMAPV1DatabaseCatalogEntry::_renameSingleNamespace( OperationContext* txn,
StringData fromNS,
StringData toNS,
bool stayTemp ) {
// some sanity checking
NamespaceDetails* fromDetails = _namespaceIndex.details( fromNS );
if ( !fromDetails )
return Status( ErrorCodes::BadValue, "from namespace doesn't exist" );
if ( _namespaceIndex.details( toNS ) )
return Status( ErrorCodes::BadValue, "to namespace already exists" );
_removeFromCache(txn->recoveryUnit(), fromNS);
// at this point, we haven't done anything destructive yet
// ----
// actually start moving
// ----
// this could throw, but if it does we're ok
_namespaceIndex.add_ns( txn, toNS, fromDetails );
NamespaceDetails* toDetails = _namespaceIndex.details( toNS );
try {
toDetails->copyingFrom(txn,
toNS,
_namespaceIndex,
fromDetails); // fixes extraOffset
}
catch( DBException& ) {
// could end up here if .ns is full - if so try to clean up / roll back a little
_namespaceIndex.kill_ns( txn, toNS );
throw;
}
// at this point, code .ns stuff moved
_namespaceIndex.kill_ns( txn, fromNS );
fromDetails = NULL;
// fix system.namespaces
BSONObj newSpec;
RecordId oldSpecLocation;
{
BSONObj oldSpec;
{
RecordStoreV1Base* rs = _getNamespaceRecordStore();
auto cursor = rs->getCursor(txn);
while (auto record = cursor->next()) {
BSONObj entry = record->data.releaseToBson();
if ( fromNS == entry["name"].String() ) {
oldSpecLocation = record->id;
oldSpec = entry.getOwned();
break;
}
}
}
invariant( !oldSpec.isEmpty() );
invariant( !oldSpecLocation.isNull() );
BSONObjBuilder b;
BSONObjIterator i( oldSpec.getObjectField( "options" ) );
while( i.more() ) {
BSONElement e = i.next();
if ( strcmp( e.fieldName(), "create" ) != 0 ) {
if (stayTemp || (strcmp(e.fieldName(), "temp") != 0))
b.append( e );
}
else {
b << "create" << toNS;
}
}
newSpec = b.obj();
}
_addNamespaceToNamespaceCollection( txn, toNS, newSpec.isEmpty() ? 0 : &newSpec );
_getNamespaceRecordStore()->deleteRecord( txn, oldSpecLocation );
Entry*& entry = _collections[toNS.toString()];
invariant( entry == NULL );
txn->recoveryUnit()->registerChange(new EntryInsertion(toNS, this));
entry = new Entry();
_insertInCache(txn, toNS, entry);
return Status::OK();
}
void MMAPV1DatabaseCatalogEntry::appendExtraStats( OperationContext* opCtx,
BSONObjBuilder* output,
double scale ) const {
if ( isEmpty() ) {
output->appendNumber( "fileSize", 0 );
}
else {
output->appendNumber( "fileSize", _extentManager.fileSize() / scale );
output->appendNumber( "nsSizeMB", static_cast( _namespaceIndex.fileLength() /
( 1024 * 1024 ) ) );
int freeListSize = 0;
int64_t freeListSpace = 0;
_extentManager.freeListStats(opCtx, &freeListSize, &freeListSpace);
BSONObjBuilder extentFreeList( output->subobjStart( "extentFreeList" ) );
extentFreeList.append( "num", freeListSize );
extentFreeList.appendNumber( "totalSize",
static_cast( freeListSpace / scale ) );
extentFreeList.done();
{
const DataFileVersion version = _extentManager.getFileFormat(opCtx);
BSONObjBuilder dataFileVersion( output->subobjStart( "dataFileVersion" ) );
dataFileVersion.append( "major", version.majorRaw() );
dataFileVersion.append( "minor", version.minorRaw() );
dataFileVersion.done();
}
}
}
bool MMAPV1DatabaseCatalogEntry::isOlderThan24( OperationContext* opCtx ) const {
if ( _extentManager.numFiles() == 0 )
return false;
const DataFileVersion version = _extentManager.getFileFormat(opCtx);
invariant(version.isCompatibleWithCurrentCode());
return !version.is24IndexClean();
}
void MMAPV1DatabaseCatalogEntry::markIndexSafe24AndUp( OperationContext* opCtx ) {
if ( _extentManager.numFiles() == 0 )
return;
DataFileVersion version = _extentManager.getFileFormat(opCtx);
invariant(version.isCompatibleWithCurrentCode());
if (version.is24IndexClean())
return; // nothing to do
version.setIs24IndexClean();
_extentManager.setFileFormat(opCtx, version);
}
bool MMAPV1DatabaseCatalogEntry::currentFilesCompatible( OperationContext* opCtx ) const {
if ( _extentManager.numFiles() == 0 )
return true;
return _extentManager.getOpenFile( 0 )->getHeader()->version.isCompatibleWithCurrentCode();
}
void MMAPV1DatabaseCatalogEntry::getCollectionNamespaces( std::list* tofill ) const {
_namespaceIndex.getCollectionNamespaces( tofill );
}
void MMAPV1DatabaseCatalogEntry::_ensureSystemCollection(OperationContext* txn,
StringData ns) {
NamespaceDetails* details = _namespaceIndex.details(ns);
if (details) {
return;
}
_namespaceIndex.add_ns( txn, ns, DiskLoc(), false );
}
void MMAPV1DatabaseCatalogEntry::_init(OperationContext* txn) {
WriteUnitOfWork wunit(txn);
// Upgrade freelist
const NamespaceString oldFreeList(name(), "$freelist");
NamespaceDetails* freeListDetails = _namespaceIndex.details(oldFreeList.ns());
if (freeListDetails) {
if (!freeListDetails->firstExtent.isNull()) {
_extentManager.freeExtents(txn,
freeListDetails->firstExtent,
freeListDetails->lastExtent);
}
_namespaceIndex.kill_ns(txn, oldFreeList.ns());
}
DataFileVersion version = _extentManager.getFileFormat(txn);
if (version.isCompatibleWithCurrentCode() && !version.mayHave28Freelist()) {
// Any DB that can be opened and written to gets this flag set.
version.setMayHave28Freelist();
_extentManager.setFileFormat(txn, version);
}
const NamespaceString nsi(name(), "system.indexes");
const NamespaceString nsn(name(), "system.namespaces");
bool isSystemNamespacesGoingToBeNew = _namespaceIndex.details(nsn.toString()) == NULL;
bool isSystemIndexesGoingToBeNew = _namespaceIndex.details(nsi.toString()) == NULL;
_ensureSystemCollection(txn, nsn.toString());
_ensureSystemCollection(txn, nsi.toString());
if (isSystemNamespacesGoingToBeNew) {
txn->recoveryUnit()->registerChange(new EntryInsertion(nsn.toString(), this));
}
if (isSystemIndexesGoingToBeNew) {
txn->recoveryUnit()->registerChange(new EntryInsertion(nsi.toString(), this));
}
Entry*& indexEntry = _collections[nsi.toString()];
Entry*& nsEntry = _collections[nsn.toString()];
NamespaceDetails* const indexDetails = _namespaceIndex.details(nsi.toString());
NamespaceDetails* const nsDetails = _namespaceIndex.details(nsn.toString());
// order has to be:
// 1) ns rs
// 2) i rs
// 3) catalog entries
if (!nsEntry) {
nsEntry = new Entry();
NamespaceDetailsRSV1MetaData* md = new NamespaceDetailsRSV1MetaData(nsn.toString(),
nsDetails);
nsEntry->recordStore.reset(new SimpleRecordStoreV1(txn,
nsn.toString(),
md,
&_extentManager,
false));
}
if (!indexEntry) {
indexEntry = new Entry();
NamespaceDetailsRSV1MetaData* md =
new NamespaceDetailsRSV1MetaData(nsi.toString(), indexDetails);
indexEntry->recordStore.reset(new SimpleRecordStoreV1(txn,
nsi.toString(),
md,
&_extentManager,
true));
}
if (isSystemIndexesGoingToBeNew) {
_addNamespaceToNamespaceCollection(txn, nsi.toString(), NULL);
}
if (!nsEntry->catalogEntry) {
nsEntry->catalogEntry.reset(
new NamespaceDetailsCollectionCatalogEntry(nsn.toString(),
nsDetails,
nsEntry->recordStore.get(),
indexEntry->recordStore.get(),
this));
}
if (!indexEntry->catalogEntry) {
indexEntry->catalogEntry.reset(
new NamespaceDetailsCollectionCatalogEntry(nsi.toString(),
indexDetails,
nsEntry->recordStore.get(),
indexEntry->recordStore.get(),
this));
}
wunit.commit();
// Now put everything in the cache of namespaces. None of the operations below do any
// transactional operations.
std::list namespaces;
_namespaceIndex.getCollectionNamespaces(&namespaces);
for (std::list::const_iterator i = namespaces.begin();
i != namespaces.end(); // we add to the list in the loop so can't cache end().
i++) {
const std::string& ns = *i;
Entry*& entry = _collections[ns];
// The two cases where entry is not null is for system.indexes and system.namespaces,
// which we manually instantiated above. It is OK to skip these two collections,
// because they don't have indexes on them anyway.
if (entry) {
continue;
}
entry = new Entry();
_insertInCache(txn, ns, entry);
// Add the indexes on this namespace to the list of namespaces to load.
std::vector indexNames;
entry->catalogEntry->getAllIndexes(txn, &indexNames);
for (size_t i = 0; i < indexNames.size(); i++) {
namespaces.push_back(IndexDescriptor::makeIndexNamespace(ns, indexNames[i]));
}
}
}
Status MMAPV1DatabaseCatalogEntry::createCollection( OperationContext* txn,
StringData ns,
const CollectionOptions& options,
bool allocateDefaultSpace ) {
if ( _namespaceIndex.details( ns ) ) {
return Status( ErrorCodes::NamespaceExists,
str::stream() << "namespace already exists: " << ns );
}
BSONObj optionsAsBSON = options.toBSON();
_addNamespaceToNamespaceCollection( txn, ns, &optionsAsBSON );
_namespaceIndex.add_ns( txn, ns, DiskLoc(), options.capped );
NamespaceDetails* details = _namespaceIndex.details(ns);
// Set the flags.
NamespaceDetailsRSV1MetaData(ns, details).replaceUserFlags(txn, options.flags);
if (options.capped && options.cappedMaxDocs > 0) {
txn->recoveryUnit()->writingInt( details->maxDocsInCapped ) = options.cappedMaxDocs;
}
Entry*& entry = _collections[ns.toString()];
invariant( !entry );
txn->recoveryUnit()->registerChange(new EntryInsertion(ns, this));
entry = new Entry();
_insertInCache(txn, ns, entry);
if ( allocateDefaultSpace ) {
RecordStoreV1Base* rs = _getRecordStore( ns );
if ( options.initialNumExtents > 0 ) {
int size = _massageExtentSize( &_extentManager, options.cappedSize );
for ( int i = 0; i < options.initialNumExtents; i++ ) {
rs->increaseStorageSize( txn, size, false );
}
}
else if ( !options.initialExtentSizes.empty() ) {
for ( size_t i = 0; i < options.initialExtentSizes.size(); i++ ) {
int size = options.initialExtentSizes[i];
size = _massageExtentSize( &_extentManager, size );
rs->increaseStorageSize( txn, size, false );
}
}
else if ( options.capped ) {
// normal
do {
// Must do this at least once, otherwise we leave the collection with no
// extents, which is invalid.
int sz = _massageExtentSize( &_extentManager,
options.cappedSize - rs->storageSize(txn) );
sz &= 0xffffff00;
rs->increaseStorageSize( txn, sz, false );
} while( rs->storageSize(txn) < options.cappedSize );
}
else {
rs->increaseStorageSize( txn, _extentManager.initialSize( 128 ), false );
}
}
return Status::OK();
}
void MMAPV1DatabaseCatalogEntry::createNamespaceForIndex(OperationContext* txn,
StringData name) {
// This is a simplified form of createCollection.
invariant(!_namespaceIndex.details(name));
_addNamespaceToNamespaceCollection(txn, name, NULL);
_namespaceIndex.add_ns(txn, name, DiskLoc(), false);
Entry*& entry = _collections[name.toString()];
invariant( !entry );
txn->recoveryUnit()->registerChange(new EntryInsertion(name, this));
entry = new Entry();
_insertInCache(txn, name, entry);
}
CollectionCatalogEntry* MMAPV1DatabaseCatalogEntry::getCollectionCatalogEntry(
StringData ns ) const {
CollectionMap::const_iterator i = _collections.find( ns.toString() );
if (i == _collections.end()) {
return NULL;
}
invariant( i->second->catalogEntry.get() );
return i->second->catalogEntry.get();
}
void MMAPV1DatabaseCatalogEntry::_insertInCache(OperationContext* txn,
StringData ns,
Entry* entry) {
NamespaceDetails* details = _namespaceIndex.details(ns);
invariant(details);
entry->catalogEntry.reset(
new NamespaceDetailsCollectionCatalogEntry(ns,
details,
_getNamespaceRecordStore(),
_getIndexRecordStore(),
this));
unique_ptr md(new NamespaceDetailsRSV1MetaData(ns, details));
const NamespaceString nss(ns);
if (details->isCapped) {
entry->recordStore.reset(new CappedRecordStoreV1(txn,
NULL,
ns,
md.release(),
&_extentManager,
nss.coll() == "system.indexes"));
}
else {
entry->recordStore.reset(new SimpleRecordStoreV1(txn,
ns,
md.release(),
&_extentManager,
nss.coll() == "system.indexes"));
}
}
RecordStore* MMAPV1DatabaseCatalogEntry::getRecordStore( StringData ns ) const {
return _getRecordStore( ns );
}
RecordStoreV1Base* MMAPV1DatabaseCatalogEntry::_getRecordStore( StringData ns ) const {
CollectionMap::const_iterator i = _collections.find( ns.toString() );
if (i == _collections.end()) {
return NULL;
}
invariant( i->second->recordStore.get() );
return i->second->recordStore.get();
}
IndexAccessMethod* MMAPV1DatabaseCatalogEntry::getIndex( OperationContext* txn,
const CollectionCatalogEntry* collection,
IndexCatalogEntry* entry ) {
const string& type = entry->descriptor()->getAccessMethodName();
string ns = collection->ns().ns();
RecordStoreV1Base* rs = _getRecordStore(entry->descriptor()->indexNamespace());
invariant(rs);
std::unique_ptr btree(
getMMAPV1Interface(entry->headManager(),
rs,
&rs->savedCursors,
entry->ordering(),
entry->descriptor()->indexNamespace(),
entry->descriptor()->version()));
if (IndexNames::HASHED == type)
return new HashAccessMethod( entry, btree.release() );
if (IndexNames::GEO_2DSPHERE == type)
return new S2AccessMethod( entry, btree.release() );
if (IndexNames::TEXT == type)
return new FTSAccessMethod( entry, btree.release() );
if (IndexNames::GEO_HAYSTACK == type)
return new HaystackAccessMethod( entry, btree.release() );
if ("" == type)
return new BtreeAccessMethod( entry, btree.release() );
if (IndexNames::GEO_2D == type)
return new TwoDAccessMethod( entry, btree.release() );
log() << "Can't find index for keyPattern " << entry->descriptor()->keyPattern();
fassertFailed(17489);
}
RecordStoreV1Base* MMAPV1DatabaseCatalogEntry::_getIndexRecordStore() {
const NamespaceString nss(name(), "system.indexes");
Entry* entry = _collections[nss.toString()];
invariant( entry );
return entry->recordStore.get();
}
RecordStoreV1Base* MMAPV1DatabaseCatalogEntry::_getNamespaceRecordStore() const {
const NamespaceString nss( name(), "system.namespaces" );
CollectionMap::const_iterator i = _collections.find( nss.toString() );
invariant( i != _collections.end() );
return i->second->recordStore.get();
}
void MMAPV1DatabaseCatalogEntry::_addNamespaceToNamespaceCollection(OperationContext* txn,
StringData ns,
const BSONObj* options) {
if (nsToCollectionSubstring(ns) == "system.namespaces") {
// system.namespaces holds all the others, so it is not explicitly listed in the catalog.
return;
}
BSONObjBuilder b;
b.append("name", ns);
if (options && !options->isEmpty()) {
b.append("options", *options);
}
const BSONObj obj = b.done();
RecordStoreV1Base* rs = _getNamespaceRecordStore();
invariant( rs );
StatusWith loc = rs->insertRecord( txn, obj.objdata(), obj.objsize(), false );
massertStatusOK( loc.getStatus() );
}
void MMAPV1DatabaseCatalogEntry::_removeNamespaceFromNamespaceCollection(
OperationContext* txn,
StringData ns ) {
if ( nsToCollectionSubstring( ns ) == "system.namespaces" ) {
// system.namespaces holds all the others, so it is not explicitly listed in the catalog.
return;
}
RecordStoreV1Base* rs = _getNamespaceRecordStore();
invariant( rs );
auto cursor = rs->getCursor(txn);
while (auto record = cursor->next()) {
BSONObj entry = record->data.releaseToBson();
BSONElement name = entry["name"];
if ( name.type() == String && name.String() == ns ) {
rs->deleteRecord( txn, record->id );
break;
}
}
}
CollectionOptions MMAPV1DatabaseCatalogEntry::getCollectionOptions( OperationContext* txn,
StringData ns ) const {
if ( nsToCollectionSubstring( ns ) == "system.namespaces" ) {
return CollectionOptions();
}
RecordStoreV1Base* rs = _getNamespaceRecordStore();
invariant( rs );
auto cursor = rs->getCursor(txn);
while (auto record = cursor->next()) {
BSONObj entry = record->data.releaseToBson();
BSONElement name = entry["name"];
if ( name.type() == String && name.String() == ns ) {
CollectionOptions options;
if ( entry["options"].isABSONObj() ) {
Status status = options.parse( entry["options"].Obj() );
fassert( 18523, status );
}
return options;
}
}
return CollectionOptions();
}
} // namespace mongo