summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/rocks
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2014-10-11 00:14:30 -0400
committerEliot Horowitz <eliot@10gen.com>2014-10-11 00:14:30 -0400
commit59b2e9e90bd6ae25e69bb980df418b9ed614e943 (patch)
tree787ebda07fdf33e2d97749289e8739e377b899be /src/mongo/db/storage/rocks
parent31a2bc4a955e5ba7751c56652ed3c85dc6991b24 (diff)
downloadmongo-59b2e9e90bd6ae25e69bb980df418b9ed614e943.tar.gz
SERVER-14352: move rocks engine to kv interface
Diffstat (limited to 'src/mongo/db/storage/rocks')
-rw-r--r--src/mongo/db/storage/rocks/SConscript59
-rw-r--r--src/mongo/db/storage/rocks/rocks_collection_catalog_entry.cpp228
-rw-r--r--src/mongo/db/storage/rocks/rocks_collection_catalog_entry.h119
-rw-r--r--src/mongo/db/storage/rocks/rocks_database_catalog_entry.cpp111
-rw-r--r--src/mongo/db/storage/rocks/rocks_database_catalog_entry.h97
-rw-r--r--src/mongo/db/storage/rocks/rocks_database_catalog_entry_fake.cpp42
-rw-r--r--src/mongo/db/storage/rocks/rocks_database_catalog_entry_mongod.cpp89
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.cpp667
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.h165
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine_test.cpp338
-rw-r--r--src/mongo/db/storage/rocks/rocks_init.cpp3
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.cpp356
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.h44
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store_test.cpp759
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.cpp64
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.h18
-rw-r--r--src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp183
-rw-r--r--src/mongo/db/storage/rocks/rocks_sorted_data_impl.h7
-rw-r--r--src/mongo/db/storage/rocks/rocks_sorted_data_impl_harness_test.cpp70
-rw-r--r--src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp965
20 files changed, 753 insertions, 3631 deletions
diff --git a/src/mongo/db/storage/rocks/SConscript b/src/mongo/db/storage/rocks/SConscript
index 93a4887908b..e79adc48bba 100644
--- a/src/mongo/db/storage/rocks/SConscript
+++ b/src/mongo/db/storage/rocks/SConscript
@@ -6,8 +6,6 @@ if has_option("rocksdb"):
env.Library(
target= 'storage_rocks_base',
source= [
- 'rocks_collection_catalog_entry.cpp',
- 'rocks_database_catalog_entry.cpp',
'rocks_engine.cpp',
'rocks_record_store.cpp',
'rocks_recovery_unit.cpp',
@@ -30,58 +28,43 @@ if has_option("rocksdb"):
env.Library(
target= 'storage_rocks',
source= [
- 'rocks_database_catalog_entry_mongod.cpp',
'rocks_init.cpp'
],
LIBDEPS= [
- 'storage_rocks_base'
+ 'storage_rocks_base',
+ '$BUILD_DIR/mongo/db/storage/kv/kv_engine'
]
)
- env.Library(
- target= 'storage_rocks_fake',
- source= [
- 'rocks_database_catalog_entry_fake.cpp',
- ],
- LIBDEPS= [
- 'storage_rocks_base'
- ]
- )
env.CppUnitTest(
- target='storage_rocks_engine_test',
- source=['rocks_engine_test.cpp',
- ],
- LIBDEPS=[
- 'storage_rocks_fake'
+ target='storage_rocks_sorted_data_impl_test',
+ source=['rocks_sorted_data_impl_test.cpp'
+ ],
+ LIBDEPS=[
+ 'storage_rocks_base',
+ '$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness'
]
- )
+ )
- env.CppUnitTest(
- target='storage_rocks_record_store_test',
- source=['rocks_record_store_test.cpp',
- ],
- LIBDEPS=[
- 'storage_rocks_fake'
- ]
- )
env.CppUnitTest(
- target='storage_rocks_sorted_data_impl_test',
- source=['rocks_sorted_data_impl_test.cpp',
- ],
- LIBDEPS=[
- 'storage_rocks_fake'
+ target='storage_rocks_record_store_test',
+ source=['rocks_record_store_test.cpp'
+ ],
+ LIBDEPS=[
+ 'storage_rocks_base',
+ '$BUILD_DIR/mongo/db/storage/record_store_test_harness'
]
- )
-
+ )
env.CppUnitTest(
- target='storage_rocks_sorted_data_impl_harness_test',
- source=['rocks_sorted_data_impl_harness_test.cpp'
+ target='storage_rocks_engine_test',
+ source=['rocks_engine_test.cpp'
],
LIBDEPS=[
- 'storage_rocks_fake',
- '$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness'
+ 'storage_rocks_base',
+ '$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness'
]
)
+
diff --git a/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.cpp b/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.cpp
deleted file mode 100644
index 102e9e14b39..00000000000
--- a/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.cpp
+++ /dev/null
@@ -1,228 +0,0 @@
-// rocks_collection_catalog_entry.cpp
-
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h"
-
-#include <rocksdb/db.h>
-
-#include "mongo/db/index/index_descriptor.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
-
-namespace mongo {
-
- static const int _maxAllowedIndexes = 64; // for compatability for now, could be higher
-
- /**
- * bson schema
- * { ns: <name for sanity>,
- * options : <bson spec>,
- * indexes : [ { spec : <bson spec>,
- * ready: <bool>,
- * head: DiskLoc,
- * multikey: <bool> } ]
- * }
- */
-
- RocksCollectionCatalogEntry::RocksCollectionCatalogEntry( RocksEngine* engine,
- const StringData& ns )
- : BSONCollectionCatalogEntry( ns ),
- _engine( engine ),
- _metaDataKey( string( "metadata-" ) + ns.toString() ) { }
-
- int RocksCollectionCatalogEntry::getMaxAllowedIndexes() const {
- return _maxAllowedIndexes;
- }
-
- BSONObj RocksCollectionCatalogEntry::getOtherIndexSpec( const StringData& indexName,
- rocksdb::DB* db ) const {
- MetaData md = _getMetaData( db );
-
- int offset = md.findIndexOffset( indexName );
- invariant( offset >= 0 );
- return md.indexes[offset].spec.getOwned();
- }
-
- bool RocksCollectionCatalogEntry::setIndexIsMultikey(OperationContext* txn,
- const StringData& indexName,
- bool multikey ) {
- boost::mutex::scoped_lock lk( _metaDataMutex );
- MetaData md = _getMetaData_inlock();
-
- int offset = md.findIndexOffset( indexName );
- invariant( offset >= 0 );
- if ( md.indexes[offset].multikey == multikey )
- return false;
- md.indexes[offset].multikey = multikey;
- _putMetaData_inlock( md );
- return true;
- }
-
- void RocksCollectionCatalogEntry::setIndexHead( OperationContext* txn,
- const StringData& indexName,
- const DiskLoc& newHead ) {
- boost::mutex::scoped_lock lk( _metaDataMutex );
- MetaData md = _getMetaData_inlock();
-
- int offset = md.findIndexOffset( indexName );
- invariant( offset >= 0 );
- md.indexes[offset].head = newHead;
- _putMetaData_inlock( md );
- }
-
- void RocksCollectionCatalogEntry::indexBuildSuccess( OperationContext* txn,
- const StringData& indexName ) {
- boost::mutex::scoped_lock lk( _metaDataMutex );
- MetaData md = _getMetaData_inlock();
-
- int offset = md.findIndexOffset( indexName );
- invariant( offset >= 0 );
- md.indexes[offset].ready = true;
- _putMetaData_inlock( md );
- }
-
- Status RocksCollectionCatalogEntry::removeIndex( OperationContext* txn,
- const StringData& indexName ) {
- boost::mutex::scoped_lock lk( _metaDataMutex );
-
- _removeIndexFromMetaData_inlock( indexName );
-
- // drop the actual index in rocksdb
- rocksdb::ColumnFamilyHandle* cfh = _engine->getIndexColumnFamilyNoCreate( ns().ns(),
- indexName );
-
- // Note: this invalidates cfh. Do not use after this call
- _engine->removeColumnFamily( &cfh, indexName, ns().ns() );
- invariant( cfh == nullptr );
-
- return Status::OK();
- }
-
- void RocksCollectionCatalogEntry::removeIndexWithoutDroppingCF( OperationContext* txn,
- const StringData& indexName ) {
- boost::mutex::scoped_lock lk( _metaDataMutex );
- _removeIndexFromMetaData_inlock( indexName );
- }
-
- Status RocksCollectionCatalogEntry::prepareForIndexBuild( OperationContext* txn,
- const IndexDescriptor* spec ) {
- boost::mutex::scoped_lock lk( _metaDataMutex );
- MetaData md = _getMetaData_inlock();
-
- md.indexes.push_back( IndexMetaData( spec->infoObj(), false, DiskLoc(), false ) );
- _putMetaData_inlock( md );
- return Status::OK();
- }
-
- /* Updates the expireAfterSeconds field of the given index to the value in newExpireSecs.
- * The specified index must already contain an expireAfterSeconds field, and the value in
- * that field and newExpireSecs must both be numeric.
- */
- void RocksCollectionCatalogEntry::updateTTLSetting( OperationContext* txn,
- const StringData& indexName,
- long long newExpireSeconds ) {
- invariant( !"ttl settings change not supported in rocks yet" );
- }
-
- void RocksCollectionCatalogEntry::createMetaData() {
- string result;
- // XXX not using a snapshot here
- rocksdb::Status status = _engine->getDB()->Get( rocksdb::ReadOptions(),
- _metaDataKey,
- &result );
- invariant( status.IsNotFound() );
-
- MetaData md;
- md.ns = ns();
-
- BSONObj obj = md.toBSON();
- status = _engine->getDB()->Put( rocksdb::WriteOptions(),
- _metaDataKey,
- rocksdb::Slice( obj.objdata(),
- obj.objsize() ) );
- invariant( status.ok() );
- }
-
-
- void RocksCollectionCatalogEntry::dropMetaData() {
- rocksdb::Status status = _engine->getDB()->Delete( rocksdb::WriteOptions(), _metaDataKey );
- invariant( status.ok() );
- }
-
- RocksCollectionCatalogEntry::MetaData RocksCollectionCatalogEntry::_getMetaData( OperationContext* txn ) const {
- return _getMetaData( _engine->getDB() );
- }
-
- RocksCollectionCatalogEntry::MetaData RocksCollectionCatalogEntry::_getMetaData(
- rocksdb::DB* db ) const {
- invariant( db );
- boost::mutex::scoped_lock lk( _metaDataMutex );
- return _getMetaData_inlock( db );
- }
-
- RocksCollectionCatalogEntry::MetaData RocksCollectionCatalogEntry::_getMetaData_inlock() const {
- return _getMetaData_inlock( _engine->getDB() );
- }
-
- // The metadata in a column family with a specific name. This method reads from that column
- // family
- RocksCollectionCatalogEntry::MetaData RocksCollectionCatalogEntry::_getMetaData_inlock(
- rocksdb::DB* db ) const {
- string result;
- // XXX not using a snapshot here
- rocksdb::Status status = db->Get( rocksdb::ReadOptions(), _metaDataKey, &result );
- invariant( !status.IsNotFound() );
- invariant( status.ok() );
-
- MetaData md;
- md.parse( BSONObj( result.c_str() ) );
- return md;
- }
-
- void RocksCollectionCatalogEntry::_removeIndexFromMetaData_inlock(
- const StringData& indexName ) {
- MetaData md = _getMetaData_inlock();
-
- // remove info from meta data
- invariant( md.eraseIndex( indexName ) );
- _putMetaData_inlock( md );
- }
-
- void RocksCollectionCatalogEntry::_putMetaData_inlock( const MetaData& in ) {
- // XXX: this should probably be done via the RocksRecoveryUnit.
- // TODO move into recovery unit
- BSONObj obj = in.toBSON();
- rocksdb::Status status = _engine->getDB()->Put( rocksdb::WriteOptions(),
- _metaDataKey,
- rocksdb::Slice( obj.objdata(),
- obj.objsize() ) );
- invariant( status.ok() );
- }
-
-}
diff --git a/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.h b/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.h
deleted file mode 100644
index 46aababd201..00000000000
--- a/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.h
+++ /dev/null
@@ -1,119 +0,0 @@
-// rocks_collection_catalog_entry.h
-
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/catalog/collection_catalog_entry.h"
-#include "mongo/db/storage/bson_collection_catalog_entry.h"
-
-namespace rocksdb {
- class DB;
-}
-
-namespace mongo {
-
- class RocksEngine;
-
- class RocksCollectionCatalogEntry : public BSONCollectionCatalogEntry {
- public:
- RocksCollectionCatalogEntry( RocksEngine* engine, const StringData& ns );
-
- virtual ~RocksCollectionCatalogEntry(){}
-
- // ------- indexes ----------
-
- virtual int getMaxAllowedIndexes() const;
-
- virtual bool setIndexIsMultikey(OperationContext* txn,
- const StringData& indexName,
- bool multikey = true);
-
- virtual void setIndexHead( OperationContext* txn,
- const StringData& indexName,
- const DiskLoc& newHead );
-
- virtual Status removeIndex( OperationContext* txn,
- const StringData& indexName );
-
- void removeIndexWithoutDroppingCF( OperationContext* txn,
- const StringData& indexName );
-
- virtual Status prepareForIndexBuild( OperationContext* txn,
- const IndexDescriptor* spec );
-
- virtual void indexBuildSuccess( OperationContext* txn,
- const StringData& indexName );
-
- /* Updates the expireAfterSeconds field of the given index to the value in newExpireSecs.
- * The specified index must already contain an expireAfterSeconds field, and the value in
- * that field and newExpireSecs must both be numeric.
- */
- virtual void updateTTLSetting( OperationContext* txn,
- const StringData& idxName,
- long long newExpireSeconds );
-
- // ------ internal api
-
- BSONObj getOtherIndexSpec( const StringData& idxName, rocksdb::DB* db ) const;
-
- // called once when collection is created.
- void createMetaData();
-
- // when collection is dropped, call this
- // all indexes have to be dropped first.
- void dropMetaData();
-
- const string metaDataKey() { return _metaDataKey; }
-
- protected:
- virtual MetaData _getMetaData( OperationContext* txn ) const;
-
- private:
- MetaData _getMetaData( rocksdb::DB* db ) const;
-
- MetaData _getMetaData_inlock() const;
- MetaData _getMetaData_inlock( rocksdb::DB* db ) const;
-
- void _removeIndexFromMetaData_inlock( const StringData& indexName );
-
- void _putMetaData_inlock( const MetaData& in );
-
- RocksEngine* _engine; // not owned
-
- // the name of the column family which holds the metadata.
- const string _metaDataKey;
-
- // lock which must be acquired before calling _getMetaData_inlock(). Protects the metadata
- // stored in the metadata column family.
- // Locking order RocksEngine::_entryMapMutex before _metaDataMutex
- mutable boost::mutex _metaDataMutex;
- };
-
-}
diff --git a/src/mongo/db/storage/rocks/rocks_database_catalog_entry.cpp b/src/mongo/db/storage/rocks/rocks_database_catalog_entry.cpp
deleted file mode 100644
index 7c074a1529f..00000000000
--- a/src/mongo/db/storage/rocks/rocks_database_catalog_entry.cpp
+++ /dev/null
@@ -1,111 +0,0 @@
-// rocks_database_catalog_entry.cpp
-
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/storage/rocks/rocks_database_catalog_entry.h"
-
-#include <rocksdb/options.h>
-
-#include "mongo/db/jsobj.h"
-#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/rocks/rocks_record_store.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
- RocksDatabaseCatalogEntry::RocksDatabaseCatalogEntry( RocksEngine* engine,
- const StringData& dbname )
- : DatabaseCatalogEntry( dbname ), _engine( engine ) {
- }
-
- bool RocksDatabaseCatalogEntry::exists() const {
- return !isEmpty();
- }
-
- bool RocksDatabaseCatalogEntry::isEmpty() const {
- // TODO(XXX): make this fast
- std::list<std::string> lst;
- getCollectionNamespaces( &lst );
- return lst.empty();
- }
-
- void RocksDatabaseCatalogEntry::appendExtraStats( OperationContext* opCtx,
- BSONObjBuilder* out,
- double scale ) const {
- // put some useful stats here
- out->append( "note", "RocksDatabaseCatalogEntry should put some database level stats in" );
- }
-
- bool RocksDatabaseCatalogEntry::currentFilesCompatible( OperationContext* opCtx ) const {
- error() << "RocksDatabaseCatalogEntry::currentFilesCompatible not done";
- return true;
- }
-
- void RocksDatabaseCatalogEntry::getCollectionNamespaces( std::list<std::string>* out ) const {
- _engine->getCollectionNamespaces( name(), out );
- }
-
- CollectionCatalogEntry* RocksDatabaseCatalogEntry::getCollectionCatalogEntry(
- OperationContext* txn,
- const StringData& ns ) const {
- RocksEngine::Entry* entry = _engine->getEntry( ns );
- if ( !entry )
- return NULL;
- return entry->collectionEntry.get();
- }
-
- RecordStore* RocksDatabaseCatalogEntry::getRecordStore( OperationContext* txn,
- const StringData& ns ) {
- RocksEngine::Entry* entry = _engine->getEntry( ns );
- if ( !entry )
- return NULL;
- return entry->recordStore.get();
- }
-
- Status RocksDatabaseCatalogEntry::createCollection( OperationContext* txn,
- const StringData& ns,
- const CollectionOptions& options,
- bool allocateDefaultSpace ) {
- return _engine->createCollection( txn, ns, options );
- }
-
- Status RocksDatabaseCatalogEntry::renameCollection( OperationContext* txn,
- const StringData& fromNS,
- const StringData& toNS,
- bool stayTemp ) {
- return Status( ErrorCodes::InternalError,
- "RocksDatabaseCatalogEntry doesn't support rename yet" );
- }
-
- Status RocksDatabaseCatalogEntry::dropCollection( OperationContext* opCtx,
- const StringData& ns ) {
- return _engine->dropCollection( opCtx, ns );
- }
-}
diff --git a/src/mongo/db/storage/rocks/rocks_database_catalog_entry.h b/src/mongo/db/storage/rocks/rocks_database_catalog_entry.h
deleted file mode 100644
index 95b4375b1e8..00000000000
--- a/src/mongo/db/storage/rocks/rocks_database_catalog_entry.h
+++ /dev/null
@@ -1,97 +0,0 @@
-// rocks_database_catalog_entry.h
-
-/**
-* Copyright (C) 2014 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
-
-#pragma once
-
-#include "mongo/db/catalog/database_catalog_entry.h"
-
-namespace mongo {
-
- class RocksEngine;
-
- /**
- * this class acts as a thin layer over RocksEngine,
- * and does and stores nothing.
- */
- class RocksDatabaseCatalogEntry : public DatabaseCatalogEntry {
- public:
- RocksDatabaseCatalogEntry( RocksEngine* engine, const StringData& dbname );
-
- virtual bool exists() const;
- virtual bool isEmpty() const;
-
- virtual void appendExtraStats( OperationContext* opCtx,
- BSONObjBuilder* out,
- double scale ) const;
-
- // these are hacks :(
- virtual bool isOlderThan24( OperationContext* opCtx ) const { return false; }
- virtual void markIndexSafe24AndUp( OperationContext* opCtx ) {}
-
- /**
- * @return true if current files on disk are compatibile with the current version.
- * if we return false, then an upgrade will be required
- */
- virtual bool currentFilesCompatible( OperationContext* opCtx ) const;
-
- // ----
-
- virtual void getCollectionNamespaces( std::list<std::string>* out ) const;
-
- // The DatabaseCatalogEntry owns this, do not delete
- virtual CollectionCatalogEntry* getCollectionCatalogEntry( OperationContext* txn,
- const StringData& ns ) const;
-
- // The DatabaseCatalogEntry owns this, do not delete
- virtual RecordStore* getRecordStore( OperationContext* txn,
- const StringData& ns );
-
- // Ownership passes to caller
- virtual IndexAccessMethod* getIndex( OperationContext* txn,
- const CollectionCatalogEntry* collection,
- IndexCatalogEntry* index );
-
- virtual Status createCollection( OperationContext* txn,
- const StringData& ns,
- const CollectionOptions& options,
- bool allocateDefaultSpace );
-
- virtual Status renameCollection( OperationContext* txn,
- const StringData& fromNS,
- const StringData& toNS,
- bool stayTemp );
-
- virtual Status dropCollection( OperationContext* opCtx,
- const StringData& ns );
-
- private:
- RocksEngine* _engine;
- };
-}
diff --git a/src/mongo/db/storage/rocks/rocks_database_catalog_entry_fake.cpp b/src/mongo/db/storage/rocks/rocks_database_catalog_entry_fake.cpp
deleted file mode 100644
index e33e0f928a8..00000000000
--- a/src/mongo/db/storage/rocks/rocks_database_catalog_entry_fake.cpp
+++ /dev/null
@@ -1,42 +0,0 @@
-// rocks_database_catalog_entry_fake.cpp
-
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/storage/rocks/rocks_database_catalog_entry.h"
-
-#include "mongo/util/assert_util.h"
-
-namespace mongo {
-
- IndexAccessMethod* RocksDatabaseCatalogEntry::getIndex( OperationContext* txn,
- const CollectionCatalogEntry* collection,
- IndexCatalogEntry* index ) {
- invariant( !"so sad, no indexes with rocks" );
- }
-}
diff --git a/src/mongo/db/storage/rocks/rocks_database_catalog_entry_mongod.cpp b/src/mongo/db/storage/rocks/rocks_database_catalog_entry_mongod.cpp
deleted file mode 100644
index 92b720ea0ef..00000000000
--- a/src/mongo/db/storage/rocks/rocks_database_catalog_entry_mongod.cpp
+++ /dev/null
@@ -1,89 +0,0 @@
-// rocks_database_catalog_entry_mongod.cpp
-
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/db/storage/rocks/rocks_database_catalog_entry.h"
-
-#include <boost/optional.hpp>
-#include <rocksdb/db.h>
-
-#include "mongo/db/index/2d_access_method.h"
-#include "mongo/db/index/btree_access_method.h"
-#include "mongo/db/index/fts_access_method.h"
-#include "mongo/db/index/hash_access_method.h"
-#include "mongo/db/index/haystack_access_method.h"
-#include "mongo/db/index/index_access_method.h"
-#include "mongo/db/index/index_descriptor.h"
-#include "mongo/db/index/s2_access_method.h"
-#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h"
-#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
- IndexAccessMethod* RocksDatabaseCatalogEntry::getIndex( OperationContext* txn,
- const CollectionCatalogEntry* collection,
- IndexCatalogEntry* index ) {
- const IndexDescriptor* desc = index->descriptor();
- const Ordering order( Ordering::make( desc->keyPattern() ) );
-
- rocksdb::ColumnFamilyHandle* cf = _engine->getIndexColumnFamily( collection->ns().ns(),
- desc->indexName(),
- order );
-
- std::auto_ptr<RocksSortedDataImpl> raw( new RocksSortedDataImpl( _engine->getDB(),
- cf,
- order ) );
-
- const string& type = index->descriptor()->getAccessMethodName();
-
- if ("" == type)
- return new BtreeAccessMethod( index, raw.release() );
-
- if (IndexNames::HASHED == type)
- return new HashAccessMethod( index, raw.release() );
-
- if (IndexNames::GEO_2DSPHERE == type)
- return new S2AccessMethod( index, raw.release() );
-
- if (IndexNames::TEXT == type)
- return new FTSAccessMethod( index, raw.release() );
-
- if (IndexNames::GEO_HAYSTACK == type)
- return new HaystackAccessMethod( index, raw.release() );
-
- if (IndexNames::GEO_2D == type)
- return new TwoDAccessMethod( index, raw.release() );
-
- log() << "Can't find index for keyPattern " << index->descriptor()->keyPattern();
- // TODO make this fassert
- return NULL;
- }
-}
diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp
index f89b75cdcc6..2a09879602a 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.cpp
+++ b/src/mongo/db/storage/rocks/rocks_engine.cpp
@@ -44,153 +44,143 @@
#include <rocksdb/utilities/write_batch_with_index.h>
#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h"
-#include "mongo/db/storage/rocks/rocks_database_catalog_entry.h"
#include "mongo/db/storage/rocks/rocks_record_store.h"
#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h"
#include "mongo/util/log.h"
-#define ROCKS_TRACE if(0) log()
+#define ROCKS_TRACE log()
#define ROCKS_STATUS_OK( s ) if ( !( s ).ok() ) { error() << "rocks error: " << ( s ).ToString(); \
invariant( false ); }
namespace mongo {
- RocksEngine::RocksEngine( const std::string& path ) : _path( path ), _defaultHandle( NULL ) {
- // TODO make this more fine-grained?
- boost::mutex::scoped_lock lk( _entryMapMutex );
- std::vector<rocksdb::ColumnFamilyDescriptor> families;
-
- vector<string> familyNames = _listFamilyNames( path );
-
- // Create the shared collection comparator
- _collectionComparator.reset( RocksRecordStore::newRocksCollectionComparator() );
-
- if ( !familyNames.empty() ) {
- // go through and create RocksCollectionCatalogEntries for all non-indexes
- _createNonIndexCatalogEntries( familyNames );
-
- // Create a mapping from index names to the Ordering object for each index.
- // These Ordering objects will be used to create RocksIndexEntryComparators to be used
- // with each column family representing a namespace
- map<string, Ordering> indexOrderings = _createIndexOrderings( familyNames, path );
-
- // get ColumnFamilyDescriptors for all the column families
- families = _createCfds( familyNames, indexOrderings );
+ // TODO make create/drop operations support rollback?
+
+ RocksEngine::RocksEngine(const std::string& path)
+ : _path(path), _collectionComparator(RocksRecordStore::newRocksCollectionComparator()) {
+
+ auto columnFamilyNames = _loadColumnFamilies(); // vector of column family names
+ std::unordered_map<std::string, Ordering> orderings; // column family name -> Ordering
+ std::set<std::string> collections; // set of collection names
+
+ if (columnFamilyNames.empty()) { // new DB
+ columnFamilyNames.push_back(rocksdb::kDefaultColumnFamilyName);
+ } else { // existing DB
+ // open DB in read-only mode to load metadata
+ rocksdb::DB* dbReadOnly;
+ auto s = rocksdb::DB::OpenForReadOnly(dbOptions(), path, &dbReadOnly);
+ ROCKS_STATUS_OK(s);
+ auto itr = dbReadOnly->NewIterator(rocksdb::ReadOptions());
+ orderings = _loadOrderingMetaData(itr);
+ collections = _loadCollections(itr);
+ delete itr;
+ delete dbReadOnly;
}
- rocksdb::DB* dbPtr;
+ std::vector<rocksdb::ColumnFamilyDescriptor> columnFamilies;
+ std::set<std::string> toDropColumnFamily;
- // If there are no column families, then just open the database and return
- if ( families.empty() ) {
- rocksdb::Status s = rocksdb::DB::Open( dbOptions(), path, &dbPtr );
- _db.reset( dbPtr );
- ROCKS_STATUS_OK( s );
-
- _defaultHandle = _db->DefaultColumnFamily();
- return;
+ for (const auto& cf : columnFamilyNames) {
+ if (cf == rocksdb::kDefaultColumnFamilyName) {
+ columnFamilies.emplace_back();
+ continue;
+ }
+ auto orderings_iter = orderings.find(cf);
+ auto collections_iter = collections.find(cf);
+ bool isIndex = orderings_iter != orderings.end();
+ bool isCollection = collections_iter != collections.end();
+ invariant(!isIndex || !isCollection);
+ if (isIndex) {
+ columnFamilies.emplace_back(cf, _indexOptions(orderings_iter->second));
+ } else if (isCollection) {
+ columnFamilies.emplace_back(cf, _collectionOptions());
+ } else {
+ // this can happen because write and createColumnFamily are not atomic
+ toDropColumnFamily.insert(cf);
+ columnFamilies.emplace_back(cf, _collectionOptions());
+ }
}
- // Open the database, getting handles for every column family
std::vector<rocksdb::ColumnFamilyHandle*> handles;
-
- rocksdb::Status s = rocksdb::DB::Open( dbOptions(), path, families, &handles, &dbPtr );
- ROCKS_STATUS_OK( s );
- _db.reset( dbPtr );
-
- invariant( handles.size() == families.size() );
-
- _defaultHandle = _db->DefaultColumnFamily();
-
- // Create an Entry object for every ColumnFamilyHandle
- _createEntries( families, handles );
+ rocksdb::DB* db;
+ auto s = rocksdb::DB::Open(dbOptions(), path, columnFamilies, &handles, &db);
+ ROCKS_STATUS_OK(s);
+ invariant(handles.size() == columnFamilies.size());
+ for (size_t i = 0; i < handles.size(); ++i) {
+ if (toDropColumnFamily.find(columnFamilies[i].name) != toDropColumnFamily.end()) {
+ db->DropColumnFamily(handles[i]);
+ delete handles[i];
+ } else if (columnFamilyNames[i] == rocksdb::kDefaultColumnFamilyName) {
+ // we will not be needing this
+ delete handles[i];
+ } else {
+ _identColumnFamilyMap[columnFamilies[i].name].reset(handles[i]);
+ }
+ }
+ _db.reset(db);
}
- RocksEngine::~RocksEngine() {
- }
+ RocksEngine::~RocksEngine() {}
- RecoveryUnit* RocksEngine::newRecoveryUnit( OperationContext* opCtx ) {
- /* TODO change to false when unit of work hooked up*/
+ RecoveryUnit* RocksEngine::newRecoveryUnit() {
+ // TODO change this to false once higher level code explicitly commits every transaction
return new RocksRecoveryUnit(_db.get(), true);
}
- void RocksEngine::listDatabases( std::vector<std::string>* out ) const {
- std::set<std::string> dbs;
-
- // TODO: make this faster
- boost::mutex::scoped_lock lk( _entryMapMutex );
- for ( EntryMap::const_iterator i = _entryMap.begin(); i != _entryMap.end(); ++i ) {
- const StringData& ns = i->first;
- if ( dbs.insert( nsToDatabase( ns ) ).second )
- out->push_back( nsToDatabase( ns ) );
+ Status RocksEngine::createRecordStore(OperationContext* opCtx, const StringData& ident,
+ const CollectionOptions& options) {
+ if (_existsColumnFamily(ident)) {
+ return Status::OK();
}
- }
-
- DatabaseCatalogEntry* RocksEngine::getDatabaseCatalogEntry( OperationContext* opCtx,
- const StringData& db ) {
- boost::mutex::scoped_lock lk( _dbCatalogMapMutex );
-
- boost::shared_ptr<RocksDatabaseCatalogEntry>& dbce = _dbCatalogMap[db.toString()];
- if ( !dbce ) {
- dbce = boost::make_shared<RocksDatabaseCatalogEntry>( this, db );
+ _db->Put(rocksdb::WriteOptions(), "collection-" + ident.toString(), rocksdb::Slice());
+ return _createColumnFamily(_collectionOptions(), ident);
+ }
+
+ RecordStore* RocksEngine::getRecordStore(OperationContext* opCtx, const StringData& ns,
+ const StringData& ident,
+ const CollectionOptions& options) {
+ auto columnFamily = _getColumnFamily(ident);
+ if (options.capped) {
+ return new RocksRecordStore(
+ ns, ident, _db.get(), columnFamily, true,
+ options.cappedSize ? options.cappedSize : 4096, // default size
+ options.cappedMaxDocs ? options.cappedMaxDocs : -1);
+ } else {
+ return new RocksRecordStore(ns, ident, _db.get(), columnFamily);
}
+ }
- return dbce.get();
+ Status RocksEngine::dropRecordStore(OperationContext* opCtx, const StringData& ident) {
+ _db->Delete(rocksdb::WriteOptions(), "collection-" + ident.toString());
+ return _dropColumnFamily(ident);
}
- int RocksEngine::flushAllFiles( bool sync ) {
- boost::mutex::scoped_lock lk( _entryMapMutex );
- for ( EntryMap::const_iterator i = _entryMap.begin(); i != _entryMap.end(); ++i ) {
- if ( i->second->cfHandle )
- _db->Flush( rocksdb::FlushOptions(), i->second->cfHandle.get() );
+ Status RocksEngine::createSortedDataInterface(OperationContext* opCtx, const StringData& ident,
+ const IndexDescriptor* desc) {
+ if (_existsColumnFamily(ident)) {
+ return Status::OK();
}
- return _entryMap.size();
- }
+ auto keyPattern = desc->keyPattern();
- Status RocksEngine::repairDatabase( OperationContext* tnx,
- const std::string& dbName,
- bool preserveClonedFilesOnFailure,
- bool backupOriginalFiles ) {
- // TODO implement
- return Status::OK();
+ _db->Put(rocksdb::WriteOptions(), "indexordering-" + ident.toString(),
+ rocksdb::Slice(keyPattern.objdata(), keyPattern.objsize()));
+ return _createColumnFamily(_indexOptions(Ordering::make(keyPattern)), ident);
}
- void RocksEngine::cleanShutdown(OperationContext* txn) {
- // no locking here because this is only called while single-threaded.
- dynamic_cast<RocksRecoveryUnit*>(txn->recoveryUnit())->destroy();
- _entryMap = EntryMap();
- _dbCatalogMap = DbCatalogMap();
- _collectionComparator.reset();
- _db.reset();
+ SortedDataInterface* RocksEngine::getSortedDataInterface(OperationContext* opCtx,
+ const StringData& ident,
+ const IndexDescriptor* desc) {
+ return new RocksSortedDataImpl(_db.get(), _getColumnFamily(ident),
+ Ordering::make(desc->keyPattern()));
}
- Status RocksEngine::closeDatabase( OperationContext* txn, const StringData& db ) {
- boost::mutex::scoped_lock lk( _dbCatalogMapMutex );
- _dbCatalogMap.erase( db.toString() );
- return Status::OK();
- }
-
- Status RocksEngine::dropDatabase( OperationContext* txn, const StringData& db ) {
- const string prefix = db.toString() + ".";
- boost::mutex::scoped_lock lk( _entryMapMutex );
- vector<string> toDrop;
-
- // TODO don't iterate through everything
- for (EntryMap::const_iterator it = _entryMap.begin(); it != _entryMap.end(); ++it ) {
- const StringData& ns = it->first;
- if ( ns.startsWith( prefix ) ) {
- toDrop.push_back( ns.toString() );
- }
- }
-
- for (vector<string>::const_iterator it = toDrop.begin(); it != toDrop.end(); ++it ) {
- _dropCollection_inlock( txn, *it );
- }
-
- return closeDatabase( txn, db );
+ Status RocksEngine::dropSortedDataInterface(OperationContext* opCtx, const StringData& ident) {
+ _db->Delete(rocksdb::WriteOptions(), "indexordering-" + ident.toString());
+ return _dropColumnFamily(ident);
}
// non public api
@@ -201,209 +191,94 @@ namespace mongo {
return options;
}
- const RocksEngine::Entry* RocksEngine::getEntry( const StringData& ns ) const {
- boost::mutex::scoped_lock lk( _entryMapMutex );
- EntryMap::const_iterator i = _entryMap.find( ns );
- if ( i == _entryMap.end() )
- return NULL;
- return i->second.get();
- }
-
- RocksEngine::Entry* RocksEngine::getEntry( const StringData& ns ) {
- boost::mutex::scoped_lock lk( _entryMapMutex );
- EntryMap::const_iterator i = _entryMap.find( ns );
- if ( i == _entryMap.end() )
- return NULL;
- return i->second.get();
+ bool RocksEngine::_existsColumnFamily(const StringData& ident) {
+ boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex);
+ return _identColumnFamilyMap.find(ident) != _identColumnFamilyMap.end();
}
- rocksdb::ColumnFamilyHandle* RocksEngine::getIndexColumnFamilyNoCreate(
- const StringData& ns,
- const StringData& indexName ) {
- ROCKS_TRACE << "getIndexColumnFamilyWithoutCreate " << ns << "$" << indexName;
-
- boost::mutex::scoped_lock lk( _entryMapMutex );
- return _getIndexColumnFamilyNoCreate_inlock( ns, indexName );
- }
-
- rocksdb::ColumnFamilyHandle* RocksEngine::getIndexColumnFamily( const StringData& ns,
- const StringData& indexName,
- const Ordering& order ) {
- ROCKS_TRACE << "getIndexColumnFamily " << ns << "$" << indexName;
-
- boost::mutex::scoped_lock lk( _entryMapMutex );
- rocksdb::ColumnFamilyHandle* existing_cf = _getIndexColumnFamilyNoCreate_inlock(
- ns,
- indexName );
- if (existing_cf)
- return existing_cf;
- // if we get here, then the column family doesn't exist, so we need to create it
- return _createIndexColumnFamily_inlock(ns, indexName, order);
+ Status RocksEngine::_createColumnFamily(const rocksdb::ColumnFamilyOptions& options,
+ const StringData& ident) {
+ rocksdb::ColumnFamilyHandle* cf;
+ auto s = _db->CreateColumnFamily(options, ident.toString(), &cf);
+ if (!s.ok()) {
+ return toMongoStatus(s);
+ }
+ boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex);
+ _identColumnFamilyMap[ident].reset(cf);
+ return Status::OK();
}
- rocksdb::ColumnFamilyHandle* RocksEngine::_getIndexColumnFamilyNoCreate_inlock(
- const StringData& ns,
- const StringData& indexName ) {
- EntryMap::const_iterator i = _entryMap.find( ns );
- if ( i == _entryMap.end() )
- return NULL;
- shared_ptr<Entry> entry = i->second;
-
+ Status RocksEngine::_dropColumnFamily(const StringData& ident) {
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily;
{
- rocksdb::ColumnFamilyHandle* handle = entry->indexNameToCF[indexName].get();
- if ( handle )
- return handle;
+ boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex);
+ auto cf_iter = _identColumnFamilyMap.find(ident);
+ if (cf_iter == _identColumnFamilyMap.end()) {
+ return Status(ErrorCodes::InternalError, "Not found");
+ }
+ columnFamily = cf_iter->second;
+ _identColumnFamilyMap.erase(cf_iter);
}
- return NULL;
+ auto s = _db->DropColumnFamily(columnFamily.get());
+ return toMongoStatus(s);
}
- rocksdb::ColumnFamilyHandle* RocksEngine::_createIndexColumnFamily_inlock(
- const StringData& ns,
- const StringData& indexName,
- const Ordering& order ) {
- EntryMap::const_iterator i = _entryMap.find( ns );
- if ( i == _entryMap.end() )
- return NULL;
- shared_ptr<Entry> entry = i->second;
-
- const string fullName = ns.toString() + string("$") + indexName.toString();
- rocksdb::ColumnFamilyHandle* cf = NULL;
-
- typedef boost::shared_ptr<const rocksdb::Comparator> SharedComparatorPtr;
- SharedComparatorPtr& comparator = entry->indexNameToComparator[indexName];
-
- comparator.reset( RocksSortedDataImpl::newRocksComparator( order ) );
- invariant( comparator );
-
- rocksdb::ColumnFamilyOptions options;
- options.comparator = comparator.get();
-
- rocksdb::Status status = _db->CreateColumnFamily( options, fullName, &cf );
- ROCKS_STATUS_OK( status );
- invariant( cf != NULL);
- entry->indexNameToCF[indexName].reset( cf );
- return cf;
- }
-
- void RocksEngine::removeColumnFamily( rocksdb::ColumnFamilyHandle** cfh,
- const StringData& indexName,
- const StringData& ns ) {
- boost::mutex::scoped_lock lk( _entryMapMutex );
- _removeColumnFamily_inlock( cfh, indexName, ns );
- }
-
- void RocksEngine::_removeColumnFamily_inlock( rocksdb::ColumnFamilyHandle** cfh,
- const StringData& indexName,
- const StringData& ns ) {
- EntryMap::const_iterator i = _entryMap.find( ns );
- const rocksdb::Status s = _db->DropColumnFamily( *cfh );
- invariant( s.ok() );
- if ( i != _entryMap.end() ) {
- i->second->indexNameToCF.erase(indexName);
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> RocksEngine::_getColumnFamily(
+ const StringData& ident) {
+ {
+ boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex);
+ auto cf_iter = _identColumnFamilyMap.find(ident);
+ invariant(cf_iter != _identColumnFamilyMap.end());
+ return cf_iter->second;
}
- *cfh = NULL;
}
- void RocksEngine::getCollectionNamespaces( const StringData& dbName,
- std::list<std::string>* out ) const {
- const string prefix = dbName.toString() + ".";
- boost::mutex::scoped_lock lk( _entryMapMutex );
- for (EntryMap::const_iterator i = _entryMap.begin(); i != _entryMap.end(); ++i ) {
- const StringData& ns = i->first;
- if ( !ns.startsWith( prefix ) )
- continue;
- out->push_back( i->first );
+ std::unordered_map<std::string, Ordering> RocksEngine::_loadOrderingMetaData(
+ rocksdb::Iterator* itr) {
+ const rocksdb::Slice kOrderingPrefix("ordering-");
+ std::unordered_map<std::string, Ordering> orderings;
+ for (itr->Seek(kOrderingPrefix); itr->Valid(); itr->Next()) {
+ rocksdb::Slice key(itr->key());
+ if (!key.starts_with(kOrderingPrefix)) {
+ break;
+ }
+ key.remove_prefix(kOrderingPrefix.size());
+ std::string value(itr->value().ToString());
+ orderings.insert({key.ToString(), Ordering::make(BSONObj(value.c_str()))});
}
+ ROCKS_STATUS_OK(itr->status());
+ return orderings;
}
-
- Status RocksEngine::createCollection( OperationContext* txn,
- const StringData& ns,
- const CollectionOptions& options ) {
-
- ROCKS_TRACE << "RocksEngine::createCollection: " << ns;
-
- boost::mutex::scoped_lock lk( _entryMapMutex );
- if ( _entryMap.find( ns ) != _entryMap.end() )
- return Status( ErrorCodes::NamespaceExists, "collection already exists" );
-
- if (ns.toString().find('$') != string::npos ) {
- return Status( ErrorCodes::BadValue, "invalid character in namespace" );
+ std::set<std::string> RocksEngine::_loadCollections(rocksdb::Iterator* itr) {
+ const rocksdb::Slice kCollectionPrefix("collection-");
+ std::set<std::string> collections;
+ for (itr->Seek(kCollectionPrefix); itr->Valid() ; itr->Next()) {
+ rocksdb::Slice key(itr->key());
+ if (!key.starts_with(kCollectionPrefix)) {
+ break;
+ }
+ key.remove_prefix(kCollectionPrefix.size());
+ collections.insert(key.ToString());
}
-
- boost::shared_ptr<Entry> entry( new Entry() );
-
- rocksdb::ColumnFamilyHandle* cf;
- rocksdb::Status s = _db->CreateColumnFamily( _collectionOptions(), ns.toString(), &cf );
- ROCKS_STATUS_OK( s );
-
- const BSONObj optionsObj = options.toBSON();
- const std::string key = ns.toString() + "-options";
- const rocksdb::Slice value( optionsObj.objdata(), optionsObj.objsize() );
-
- dynamic_cast<RocksRecoveryUnit*>( txn->recoveryUnit() )->writeBatch()->Put(_defaultHandle,
- key,
- value);
- entry->cfHandle.reset( cf );
-
- invariant(_defaultHandle != NULL);
-
- if ( options.capped )
- entry->recordStore.reset( new RocksRecordStore(
- ns,
- _db.get(),
- entry->cfHandle.get(),
- _defaultHandle,
- true,
- options.cappedSize ? options.cappedSize : 4096,
- options.cappedMaxDocs ? options.cappedMaxDocs : -1 ) );
- else
- entry->recordStore.reset( new RocksRecordStore(
- ns, _db.get(), entry->cfHandle.get(), _defaultHandle ) );
-
- entry->collectionEntry.reset( new RocksCollectionCatalogEntry( this, ns ) );
- entry->collectionEntry->createMetaData();
-
- _entryMap[ns] = entry;
- return Status::OK();
+ ROCKS_STATUS_OK(itr->status());
+ return collections;
}
- Status RocksEngine::dropCollection( OperationContext* opCtx, const StringData& ns ) {
- boost::mutex::scoped_lock lk( _entryMapMutex );
- return _dropCollection_inlock( opCtx, ns );
- }
+ std::vector<std::string> RocksEngine::_loadColumnFamilies() {
+ std::vector<std::string> names;
+ if (boost::filesystem::exists(_path)) {
+ rocksdb::Status s = rocksdb::DB::ListColumnFamilies(dbOptions(), _path, &names);
- Status RocksEngine::_dropCollection_inlock( OperationContext* opCtx, const StringData& ns ) {
- // XXX not using a snapshot here (anywhere in the method, really)
- if ( _entryMap.find( ns ) == _entryMap.end() )
- return Status( ErrorCodes::NamespaceNotFound, "can't find collection to drop" );
- Entry* entry = _entryMap[ns].get();
-
- for ( auto it = entry->indexNameToCF.begin(); it != entry->indexNameToCF.end(); ++it ) {
- entry->collectionEntry->removeIndexWithoutDroppingCF( opCtx, it->first );
-
- // drop the actual index in rocksdb
- rocksdb::ColumnFamilyHandle* cfh = _getIndexColumnFamilyNoCreate_inlock( ns,
- it->first );
-
- // Note: this invalidates cfh. Do not use after this call
- _removeColumnFamily_inlock( &cfh, it->first, ns );
- invariant( cfh == nullptr );
+ if (s.IsIOError()) {
+ // DNE, this means the directory exists but is empty, which is fine
+ // because it means no rocks database exists yet
+ } else {
+ ROCKS_STATUS_OK(s);
+ }
}
- entry->recordStore->dropRsMetaData( opCtx );
- entry->recordStore.reset( NULL );
- entry->collectionEntry->dropMetaData();
- entry->collectionEntry.reset( NULL );
-
- rocksdb::Status status = _db->DropColumnFamily( entry->cfHandle.get() );
- ROCKS_STATUS_OK( status );
-
- entry->cfHandle.reset( NULL );
-
- _entryMap.erase( ns );
-
- return Status::OK();
+ return names;
}
rocksdb::Options RocksEngine::dbOptions() {
@@ -415,6 +290,7 @@ namespace mongo {
// create the DB if it's not already present
options.create_if_missing = true;
+ options.create_missing_column_families = true;
return options;
}
@@ -426,202 +302,11 @@ namespace mongo {
return options;
}
- rocksdb::ColumnFamilyOptions RocksEngine::_indexOptions() const {
- return rocksdb::ColumnFamilyOptions();
- }
-
- vector<std::string> RocksEngine::_listFamilyNames( string filepath ) {
- std::vector<std::string> familyNames;
- if ( boost::filesystem::exists( filepath ) ) {
- rocksdb::Status s = rocksdb::DB::ListColumnFamilies( dbOptions(),
- filepath,
- &familyNames );
-
- if ( s.IsIOError() ) {
- // DNE, this means the directory exists but is empty, which is fine
- // because it means no rocks database exists yet
- } else {
- ROCKS_STATUS_OK( s );
- }
- }
-
- return familyNames;
- }
-
- bool RocksEngine::_isDefaultFamily( const string& name ) {
- return name == rocksdb::kDefaultColumnFamilyName;
- }
-
- /**
- * Create Entry's for all non-index column families in the database. This method is called by
- * the constructor. It is necessary because information about indexes is needed before a
- * column family representing an index can be opened (specifically, the orderings used in the
- * comparators for these column families needs to be known). This information is accessed
- * through the RocksCollectionCatalogEntry class for each non-index column family in the
- * database. Hence, this method.
- */
- void RocksEngine::_createNonIndexCatalogEntries( const vector<string>& namespaces ) {
- for ( unsigned i = 0; i < namespaces.size(); ++i ) {
- const string ns = namespaces[i];
-
- // ignore the default column family, which RocksDB makes us keep around.
- if ( _isDefaultFamily( ns ) ) {
- continue;
- }
-
- string collection = ns;
- if ( ns.find( '&' ) != string::npos || ns.find( '$' ) != string::npos ) {
- continue;
- }
-
- boost::shared_ptr<Entry>& entry = _entryMap[collection];
- invariant( !entry );
- entry = boost::make_shared<Entry>();
-
- // We'll use this RocksCollectionCatalogEntry to open the column families representing
- // indexes
- invariant( !entry->collectionEntry );
- entry->collectionEntry.reset( new RocksCollectionCatalogEntry( this, ns ) );
- }
- }
-
- map<string, Ordering> RocksEngine::_createIndexOrderings( const vector<string>& namespaces,
- const string& filepath ) {
- // open the default column family so that we can retrieve information about
- // each index, which is needed in order to open the index column families
- rocksdb::DB* db;
- rocksdb::Status status = rocksdb::DB::OpenForReadOnly( dbOptions(), filepath, &db );
- boost::scoped_ptr<rocksdb::DB> dbPtr( db );
-
- ROCKS_STATUS_OK( status );
-
- map<string, Ordering> indexOrderings;
-
- // populate indexOrderings
- for ( unsigned i = 0; i < namespaces.size(); i++ ) {
- const string ns = namespaces[i];
- const size_t sepPos = ns.find( '$' );
- if ( sepPos == string::npos ) {
- continue;
- }
-
- // the default family, which we want to ignore, should be caught in the above if
- // statement.
- invariant( !_isDefaultFamily( ns ) );
-
- string collection = ns.substr( 0, sepPos );
-
- boost::shared_ptr<Entry>& entry = _entryMap[collection];
- invariant( entry );
-
- // Generate the Ordering object for each index, allowing the column families
- // representing these indexes to eventually be opened
- const string indexName = ns.substr( sepPos + 1 );
- const BSONObj spec = entry->collectionEntry->getOtherIndexSpec( indexName, db );
- const Ordering order = Ordering::make( spec["key"].Obj() );
-
- indexOrderings.insert( std::make_pair( indexName, order ) );
- }
-
- return indexOrderings;
- }
-
- RocksEngine::CfdVector RocksEngine::_createCfds( const std::vector<std::string>& namespaces,
- const map<string, Ordering>& indexOrderings ) {
- CfdVector families;
-
- for ( size_t i = 0; i < namespaces.size(); i++ ) {
- const std::string ns = namespaces[i];
- const size_t sepPos = ns.find( '$' );
- const bool isIndex = sepPos != string::npos;
-
- if ( isIndex ) {
- rocksdb::ColumnFamilyOptions options = _indexOptions();
-
- const string indexName = ns.substr( sepPos + 1 );
-
- const map<string, Ordering>::const_iterator it = indexOrderings.find( indexName );
- invariant( it != indexOrderings.end() );
-
- const Ordering order = it->second;
- options.comparator = RocksSortedDataImpl::newRocksComparator( order );
-
- families.push_back( rocksdb::ColumnFamilyDescriptor( ns, options ) );
-
- } else if ( _isDefaultFamily( ns ) ) {
- families.push_back( rocksdb::ColumnFamilyDescriptor( ns,
- rocksdb::ColumnFamilyOptions() ) );
- } else {
- families.push_back( rocksdb::ColumnFamilyDescriptor( ns, _collectionOptions() ) );
- }
- }
-
- return families;
- }
-
- void RocksEngine::_createEntries( const CfdVector& families,
- const vector<rocksdb::ColumnFamilyHandle*> handles ) {
- invariant(_defaultHandle != NULL);
-
- for ( unsigned i = 0; i < families.size(); i++ ) {
- const string ns = families[i].name;
-
- ROCKS_TRACE << "RocksEngine found ns: " << ns;
-
- // RocksDB specifies that the default column family must be included in the database.
- // We want to ignore this column family.
- if ( _isDefaultFamily( ns ) ) {
- continue;
- }
-
- string collection = ns;
-
- const size_t sepPos = ns.find( '$' );
-
- const bool isIndex = sepPos != string::npos;
- if ( isIndex ) {
- collection = ns.substr( 0, sepPos );
- }
-
- boost::shared_ptr<Entry> entry = _entryMap[collection];
- invariant( entry );
-
- if ( isIndex ) {
- string indexName = ns.substr( sepPos + 1 );
- ROCKS_TRACE << " got index " << indexName << " for " << collection;
- entry->indexNameToCF[indexName].reset( handles[i] );
-
- invariant( families[i].options.comparator );
- entry->indexNameToComparator[indexName].reset( families[i].options.comparator );
- } else {
- std::string optionsKey = ns + "-options";
- std::string value;
- rocksdb::Status status = _db->Get(rocksdb::ReadOptions(), optionsKey, &value);
-
- ROCKS_STATUS_OK( status );
- BSONObj optionsObj( value.data() );
-
- CollectionOptions options;
- options.parse( optionsObj );
-
- entry->cfHandle.reset( handles[i] );
- if ( options.capped ) {
- entry->recordStore.reset(new RocksRecordStore(
- ns,
- _db.get(),
- handles[i],
- _defaultHandle,
- options.capped,
- options.cappedSize ? options.cappedSize : 4096, // default size
- options.cappedMaxDocs ? options.cappedMaxDocs : -1) );
- } else {
- entry->recordStore.reset( new RocksRecordStore( ns, _db.get(), handles[i],
- _defaultHandle ) );
- }
- // entry->collectionEntry is set in _createNonIndexCatalogEntries()
- invariant( entry->collectionEntry );
- }
- }
+ rocksdb::ColumnFamilyOptions RocksEngine::_indexOptions(const Ordering& order) const {
+ rocksdb::ColumnFamilyOptions options;
+ invariant( _collectionComparator.get() );
+ options.comparator = RocksSortedDataImpl::newRocksComparator(order);
+ return options;
}
Status toMongoStatus( rocksdb::Status s ) {
diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h
index 62e8a62e480..1094a74db8d 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.h
+++ b/src/mongo/db/storage/rocks/rocks_engine.h
@@ -34,6 +34,7 @@
#include <list>
#include <map>
#include <string>
+#include <memory>
#include <boost/optional.hpp>
#include <boost/scoped_ptr.hpp>
@@ -44,7 +45,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/ordering.h"
-#include "mongo/db/storage/storage_engine.h"
+#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/util/string_map.h"
namespace rocksdb {
@@ -53,126 +54,67 @@ namespace rocksdb {
struct ColumnFamilyOptions;
class DB;
class Comparator;
+ class Iterator;
struct Options;
struct ReadOptions;
}
namespace mongo {
- class RocksCollectionCatalogEntry;
- class RocksDatabaseCatalogEntry;
- class RocksRecordStore;
-
struct CollectionOptions;
- /**
- * Since we have one DB for the entire server, the RocksEngine is going to hold all state.
- * DatabaseCatalogEntry will just be a thin slice over the engine.
- */
- class RocksEngine : public StorageEngine {
+ class RocksEngine : public KVEngine {
MONGO_DISALLOW_COPYING( RocksEngine );
public:
RocksEngine( const std::string& path );
virtual ~RocksEngine();
- virtual RecoveryUnit* newRecoveryUnit( OperationContext* opCtx );
-
- virtual void listDatabases( std::vector<std::string>* out ) const;
-
- virtual DatabaseCatalogEntry* getDatabaseCatalogEntry( OperationContext* opCtx,
- const StringData& db );
+ virtual RecoveryUnit* newRecoveryUnit() override;
- virtual bool supportsDocLocking() const { return false; }
+ virtual Status createRecordStore(OperationContext* opCtx, const StringData& ident,
+ const CollectionOptions& options) override;
- /**
- * @return number of files flushed
- */
- virtual int flushAllFiles( bool sync );
+ virtual RecordStore* getRecordStore(OperationContext* opCtx, const StringData& ns,
+ const StringData& ident,
+ const CollectionOptions& options) override;
- virtual Status repairDatabase( OperationContext* tnx,
- const std::string& dbName,
- bool preserveClonedFilesOnFailure = false,
- bool backupOriginalFiles = false );
+ virtual Status dropRecordStore(OperationContext* opCtx, const StringData& ident) override;
- virtual void cleanShutdown(OperationContext* txn);
+ virtual Status createSortedDataInterface(OperationContext* opCtx, const StringData& ident,
+ const IndexDescriptor* desc) override;
- virtual Status closeDatabase( OperationContext* txn, const StringData& db );
+ virtual SortedDataInterface* getSortedDataInterface(OperationContext* opCtx,
+ const StringData& ident,
+ const IndexDescriptor* desc) override;
- virtual Status dropDatabase( OperationContext* txn, const StringData& db );
+ virtual Status dropSortedDataInterface(OperationContext* opCtx,
+ const StringData& ident) override;
// rocks specific api
rocksdb::DB* getDB() { return _db.get(); }
const rocksdb::DB* getDB() const { return _db.get(); }
- void getCollectionNamespaces( const StringData& dbName, std::list<std::string>* out ) const;
-
- Status createCollection( OperationContext* txn,
- const StringData& ns,
- const CollectionOptions& options );
-
- Status dropCollection( OperationContext* opCtx, const StringData& ns );
-
- /**
- * Will create if doesn't exist. The collection has to exist first, though.
- * The ordering argument is only used if the column family does not exist. If the
- * column family does not exist, then ordering must be a non-empty optional, containing
- * the Ordering for the index.
- */
- rocksdb::ColumnFamilyHandle* getIndexColumnFamily( const StringData& ns,
- const StringData& indexName,
- const Ordering& order );
-
- /**
- * Will return NULL if doesn't exist.
- */
- rocksdb::ColumnFamilyHandle* getIndexColumnFamilyNoCreate(const StringData& ns,
- const StringData& indexName );
- /**
- * Completely removes a column family. Input pointer is invalid after calling
- */
- void removeColumnFamily( rocksdb::ColumnFamilyHandle** cfh,
- const StringData& indexName,
- const StringData& ns );
-
/**
* Returns a ReadOptions object that uses the snapshot contained in opCtx
*/
static rocksdb::ReadOptions readOptionsWithSnapshot( OperationContext* opCtx );
- struct Entry {
- boost::scoped_ptr<rocksdb::ColumnFamilyHandle> cfHandle;
- boost::scoped_ptr<RocksCollectionCatalogEntry> collectionEntry;
- boost::scoped_ptr<RocksRecordStore> recordStore;
- // These ColumnFamilyHandles must be deleted by removeIndex
- StringMap<boost::shared_ptr<rocksdb::ColumnFamilyHandle>> indexNameToCF;
- StringMap<boost::shared_ptr<const rocksdb::Comparator>> indexNameToComparator;
- };
-
- Entry* getEntry( const StringData& ns );
- const Entry* getEntry( const StringData& ns ) const;
-
- typedef std::vector<rocksdb::ColumnFamilyDescriptor> CfdVector;
-
static rocksdb::Options dbOptions();
private:
- rocksdb::ColumnFamilyHandle* _getIndexColumnFamilyNoCreate_inlock(
- const StringData& ns,
- const StringData& indexName);
+ bool _existsColumnFamily(const StringData& ident);
+ Status _createColumnFamily(const rocksdb::ColumnFamilyOptions& options,
+ const StringData& ident);
+ Status _dropColumnFamily(const StringData& ident);
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> _getColumnFamily(const StringData& ident);
- rocksdb::ColumnFamilyHandle* _createIndexColumnFamily_inlock( const StringData& ns,
- const StringData& indexName,
- const Ordering& order );
-
- void _removeColumnFamily_inlock( rocksdb::ColumnFamilyHandle** cfh,
- const StringData& indexName,
- const StringData& ns );
-
- Status _dropCollection_inlock( OperationContext* opCtx, const StringData& ns );
+ std::unordered_map<std::string, Ordering> _loadOrderingMetaData(rocksdb::Iterator* itr);
+ std::set<std::string> _loadCollections(rocksdb::Iterator* itr);
+ std::vector<std::string> _loadColumnFamilies();
rocksdb::ColumnFamilyOptions _collectionOptions() const;
- rocksdb::ColumnFamilyOptions _indexOptions() const;
+ rocksdb::ColumnFamilyOptions _indexOptions(const Ordering& order) const;
std::string _path;
boost::scoped_ptr<rocksdb::DB> _db;
@@ -181,56 +123,9 @@ namespace mongo {
// Default column family is owned by the rocksdb::DB instance.
rocksdb::ColumnFamilyHandle* _defaultHandle;
- typedef StringMap< boost::shared_ptr<Entry> > EntryMap;
- mutable boost::mutex _entryMapMutex;
- EntryMap _entryMap;
-
- typedef StringMap<boost::shared_ptr<RocksDatabaseCatalogEntry> > DbCatalogMap;
- // illegal to hold at the same time as _entryMapMutex
- boost::mutex _dbCatalogMapMutex;
- DbCatalogMap _dbCatalogMap;
-
- // private methods that should usually only be called from the RocksEngine constructor
-
- // RocksDB necessitates opening a default column family. This method exists to identify
- // that column family so that it can be ignored.
- bool _isDefaultFamily( const string& name );
-
- // See larger comment in .cpp for why this is necessary
- void _createNonIndexCatalogEntries( const std::vector<std::string>& families );
-
- /**
- * Return a vector containing the name of every column family in the database
- */
- std::vector<std::string> _listFamilyNames( std::string filepath );
-
- /**
- * @param namespaces a vector containing all the namespaces in this database.
- * @param metaDataCfds a vector of the column family descriptors for every column family
- * in the database representing metadata.
- */
- std::map<std::string, Ordering> _createIndexOrderings(
- const std::vector<string>& namespaces,
- const std::string& filepath );
-
- /**
- * @param namespaces a vector containing all the namespaces in this database
- */
- CfdVector _createCfds ( const std::vector<std::string>& namespaces,
- const std::map<std::string, Ordering>& indexOrderings );
-
- /**
- * Create a complete Entry object in _entryMap for every ColumnFamilyDescriptor. Assumes
- * that if the collectionEntry field should be initialized, that is already has been prior
- * to this function call.
- *
- * @param families A vector of column family descriptors for every column family in the
- * database
- * @param handles A vector of column family handles for every column family in the
- * database
- */
- void _createEntries( const CfdVector& families,
- const std::vector<rocksdb::ColumnFamilyHandle*> handles );
+ mutable boost::mutex _identColumnFamilyMapMutex;
+ typedef StringMap<boost::shared_ptr<rocksdb::ColumnFamilyHandle> > IdentColumnFamilyMap;
+ IdentColumnFamilyMap _identColumnFamilyMap;
};
Status toMongoStatus( rocksdb::Status s );
diff --git a/src/mongo/db/storage/rocks/rocks_engine_test.cpp b/src/mongo/db/storage/rocks/rocks_engine_test.cpp
index 8cfa8e550a7..a07d2078d17 100644
--- a/src/mongo/db/storage/rocks/rocks_engine_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_engine_test.cpp
@@ -1,311 +1,69 @@
-// rocks_engine_test.cpp
+// kv_engine_test_harness.h
/**
-* Copyright (C) 2014 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#include <boost/filesystem/operations.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <rocksdb/comparator.h>
#include <rocksdb/db.h>
-#include <rocksdb/slice.h>
#include <rocksdb/options.h>
+#include <rocksdb/slice.h>
-#include "mongo/db/catalog/collection_options.h"
-#include "mongo/db/index/index_descriptor.h"
-#include "mongo/db/operation_context_noop.h"
-#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h"
+#include "mongo/db/storage/kv/kv_engine.h"
+#include "mongo/db/storage/kv/kv_engine_test_harness.h"
#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/rocks/rocks_record_store.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
-#include "mongo/unittest/unittest.h"
-
-using namespace mongo;
+#include "mongo/unittest/temp_dir.h"
namespace mongo {
-
- class MyOperationContext : public OperationContextNoop {
+ class RocksEngineHarnessHelper : public KVHarnessHelper {
public:
- MyOperationContext(RocksEngine* engine)
- : OperationContextNoop(new RocksRecoveryUnit(engine->getDB(), false)) {}
- };
-
- TEST( RocksEngineTest, Start1 ) {
- std::string path = "/tmp/mongo-rocks-engine-test";
- boost::filesystem::remove_all( path );
- {
- RocksEngine engine( path );
- }
-
- {
- RocksEngine engine( path );
- }
-
- }
-
- TEST( RocksEngineTest, CreateDirect1 ) {
- std::string path = "/tmp/mongo-rocks-engine-test";
- boost::filesystem::remove_all( path );
- RocksEngine engine( path );
-
- {
- MyOperationContext opCtx( &engine );
- Status status = engine.createCollection( &opCtx,
- "test.foo",
- CollectionOptions() );
- ASSERT_OK( status );
- }
-
- RocksRecordStore* rs = engine.getEntry( "test.foo" )->recordStore.get();
- string s = "eliot was here";
-
- {
- MyOperationContext opCtx( &engine );
- DiskLoc loc;
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs->insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- }
-
- ASSERT_EQUALS( s, rs->dataFor( &opCtx, loc ).data() );
- }
- }
-
- TEST( RocksEngineTest, DropDirect1 ) {
- std::string path = "/tmp/mongo-rocks-engine-test";
- boost::filesystem::remove_all( path );
- RocksEngine engine( path );
-
- {
- MyOperationContext opCtx( &engine );
- Status status = engine.createCollection( &opCtx,
- "test.foo",
- CollectionOptions() );
- ASSERT_OK( status );
- }
-
- {
- MyOperationContext opCtx( &engine );
- Status status = engine.createCollection( &opCtx,
- "test.bar",
- CollectionOptions() );
- ASSERT_OK( status );
- }
-
- {
- MyOperationContext opCtx( &engine );
- Status status = engine.createCollection( &opCtx,
- "silly.bar",
- CollectionOptions() );
- ASSERT_OK( status );
- }
-
- {
- std::list<std::string> names;
- engine.getCollectionNamespaces( "test", &names );
- ASSERT_EQUALS( 2U, names.size() );
- }
-
- {
- std::list<std::string> names;
- engine.getCollectionNamespaces( "silly", &names );
- ASSERT_EQUALS( 1U, names.size() );
- }
-
- {
- MyOperationContext opCtx( &engine );
- Status status = engine.dropCollection( &opCtx,
- "test.foo" );
- ASSERT_OK( status );
+ RocksEngineHarnessHelper() : _dbpath("mongo-rocks-engine-test") {
+ boost::filesystem::remove_all(_dbpath.path());
+ _engine.reset(new RocksEngine(_dbpath.path()));
}
- {
- std::list<std::string> names;
- engine.getCollectionNamespaces( "test", &names );
- ASSERT_EQUALS( 1U, names.size() );
- ASSERT_EQUALS( names.front(), "test.bar" );
- }
-
- {
- MyOperationContext opCtx( &engine );
- Status status = engine.dropCollection( &opCtx,
- "test.foo" );
- ASSERT_NOT_OK( status );
- }
- }
-
- TEST( RocksCollectionEntryTest, MetaDataRoundTrip ) {
- RocksCollectionCatalogEntry::MetaData md;
- md.ns = "test.foo";
- md.indexes.push_back( RocksCollectionCatalogEntry::IndexMetaData( BSON( "a" << 1 ),
- true,
- DiskLoc( 5, 17 ),
- false ) );
-
- BSONObj a = md.toBSON();
- ASSERT_EQUALS( 3, a.nFields() );
- ASSERT_EQUALS( string("test.foo"), a["ns"].String() );
- ASSERT_EQUALS( BSONObj(), a["options"].Obj() );
- BSONObj indexes = a["indexes"].Obj();
- ASSERT_EQUALS( 1, indexes.nFields() );
- BSONObj idx = indexes["0"].Obj();
- ASSERT_EQUALS( 5, idx.nFields() );
- ASSERT_EQUALS( BSON( "a" << 1 ), idx["spec"].Obj() );
- ASSERT( idx["ready"].trueValue() );
- ASSERT( !idx["multikey"].trueValue() );
- ASSERT_EQUALS( 5, idx["head_a"].Int() );
- ASSERT_EQUALS( 17, idx["head_b"].Int() );
-
- RocksCollectionCatalogEntry::MetaData md2;
- md2.parse( a );
- ASSERT_EQUALS( md.indexes[0].head, md2.indexes[0].head );
- ASSERT_EQUALS( a, md2.toBSON() );
- }
-
- TEST( RocksCollectionEntryTest, IndexCreateAndMod1 ) {
- std::string path = "/tmp/mongo-rocks-engine-test";
- boost::filesystem::remove_all( path );
- RocksEngine engine( path );
-
- {
- RocksCollectionCatalogEntry coll( &engine, "test.foo" );
- coll.createMetaData();
- {
- MyOperationContext opCtx( &engine );
- ASSERT_EQUALS( 0, coll.getTotalIndexCount(&opCtx) );
- }
-
- BSONObj spec = BSON( "key" << BSON( "a" << 1 ) <<
- "name" << "silly" <<
- "ns" << "test.foo" );
-
- IndexDescriptor desc( NULL, "", spec );
-
- {
- MyOperationContext opCtx( &engine );
- Status status = coll.prepareForIndexBuild( &opCtx, &desc );
- ASSERT_OK( status );
- }
-
- {
- MyOperationContext opCtx( &engine );
- ASSERT_EQUALS( 1, coll.getTotalIndexCount(&opCtx) );
- ASSERT_EQUALS( 0, coll.getCompletedIndexCount(&opCtx) );
- ASSERT( !coll.isIndexReady( &opCtx, "silly" ) );
- }
-
- {
- MyOperationContext opCtx( &engine );
- coll.indexBuildSuccess( &opCtx, "silly" );
- }
-
- {
- MyOperationContext opCtx( &engine );
- ASSERT_EQUALS( 1, coll.getTotalIndexCount(&opCtx) );
- ASSERT_EQUALS( 1, coll.getCompletedIndexCount(&opCtx) );
- ASSERT( coll.isIndexReady( &opCtx, "silly" ) );
- }
-
- {
- MyOperationContext opCtx( &engine );
- ASSERT_EQUALS( DiskLoc(), coll.getIndexHead( &opCtx, "silly" ) );
- }
+ virtual ~RocksEngineHarnessHelper() = default;
- {
- MyOperationContext opCtx( &engine );
- coll.setIndexHead( &opCtx, "silly", DiskLoc( 123,321 ) );
- }
+ virtual KVEngine* getEngine() { return _engine.get(); }
- {
- MyOperationContext opCtx( &engine );
- ASSERT_EQUALS( DiskLoc(123, 321), coll.getIndexHead( &opCtx, "silly" ) );
- ASSERT( !coll.isIndexMultikey( &opCtx, "silly" ) );
- }
-
- {
- MyOperationContext opCtx( &engine );
- coll.setIndexIsMultikey( &opCtx, "silly", true );
- }
-
- {
- MyOperationContext opCtx( &engine );
- ASSERT( coll.isIndexMultikey( &opCtx, "silly" ) );
- }
-
- }
- }
-
- TEST( RocksEngineTest, Restart1 ) {
- std::string path = "/tmp/mongo-rocks-engine-test";
- boost::filesystem::remove_all( path );
-
- string s = "eliot was here";
- DiskLoc loc;
-
- {
- RocksEngine engine( path );
-
- {
- MyOperationContext opCtx( &engine );
- WriteUnitOfWork uow( &opCtx );
- Status status = engine.createCollection( &opCtx,
- "test.foo",
- CollectionOptions() );
- ASSERT_OK( status );
- uow.commit();
- }
-
- RocksRecordStore* rs = engine.getEntry( "test.foo" )->recordStore.get();
-
- {
- MyOperationContext opCtx( &engine );
-
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs->insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- uow.commit();
- }
-
- ASSERT_EQUALS( s, rs->dataFor( &opCtx, loc ).data() );
- engine.cleanShutdown( &opCtx );
- }
+ virtual KVEngine* restartEngine() {
+ _engine.reset(nullptr);
+ _engine.reset(new RocksEngine(_dbpath.path()));
+ return _engine.get();
}
- {
- RocksEngine engine(path);
- RocksRecordStore* rs = engine.getEntry("test.foo")->recordStore.get();
- MyOperationContext opCtx(&engine);
- ASSERT_EQUALS(s, rs->dataFor(&opCtx, loc).data());
- }
+ private:
+ unittest::TempDir _dbpath;
- }
+ boost::scoped_ptr<RocksEngine> _engine;
+ };
+ KVHarnessHelper* KVHarnessHelper::create() { return new RocksEngineHarnessHelper(); }
}
diff --git a/src/mongo/db/storage/rocks/rocks_init.cpp b/src/mongo/db/storage/rocks/rocks_init.cpp
index 14da69ade87..922ab30ed04 100644
--- a/src/mongo/db/storage/rocks/rocks_init.cpp
+++ b/src/mongo/db/storage/rocks/rocks_init.cpp
@@ -34,6 +34,7 @@
#include "mongo/base/init.h"
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/storage_options.h"
+#include "mongo/db/storage/kv/kv_storage_engine.h"
namespace mongo {
@@ -42,7 +43,7 @@ namespace mongo {
public:
virtual ~RocksFactory(){}
virtual StorageEngine* create( const StorageGlobalParams& params ) const {
- return new RocksEngine( params.dbpath );
+ return new KVStorageEngine(new RocksEngine(params.dbpath));
}
};
} // namespace
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp
index 0c527ebb0b2..168cee282aa 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp
@@ -29,9 +29,9 @@
* it in the license file.
*/
-#include "mongo/db/operation_context.h"
#include "mongo/db/storage/rocks/rocks_record_store.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
+
+#include <memory>
#include <rocksdb/comparator.h>
#include <rocksdb/db.h>
@@ -39,32 +39,29 @@
#include <rocksdb/slice.h>
#include <rocksdb/utilities/write_batch_with_index.h>
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
#include "mongo/util/log.h"
+#include "mongo/util/timer.h"
namespace mongo {
- RocksRecordStore::RocksRecordStore( const StringData& ns,
- rocksdb::DB* db, // not owned here
- rocksdb::ColumnFamilyHandle* columnFamily,
- rocksdb::ColumnFamilyHandle* metadataColumnFamily,
- bool isCapped,
- int64_t cappedMaxSize,
- int64_t cappedMaxDocs,
- CappedDocumentDeleteCallback* cappedDeleteCallback )
- : RecordStore( ns ),
- _db( db ),
- _columnFamily( columnFamily ),
- _metadataColumnFamily( metadataColumnFamily ),
- _isCapped( isCapped ),
- _cappedMaxSize( cappedMaxSize ),
- _cappedMaxDocs( cappedMaxDocs ),
- _cappedDeleteCallback( cappedDeleteCallback ),
- _dataSizeKey( ns.toString() + "-dataSize" ),
- _numRecordsKey( ns.toString() + "-numRecords" ) {
+ RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id,
+ rocksdb::DB* db, // not owned here
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
+ bool isCapped, int64_t cappedMaxSize, int64_t cappedMaxDocs,
+ CappedDocumentDeleteCallback* cappedDeleteCallback)
+ : RecordStore(ns),
+ _db(db),
+ _columnFamily(columnFamily),
+ _isCapped(isCapped),
+ _cappedMaxSize(cappedMaxSize),
+ _cappedMaxDocs(cappedMaxDocs),
+ _cappedDeleteCallback(cappedDeleteCallback),
+ _dataSizeKey("datasize-" + id.toString()),
+ _numRecordsKey("numrecords-" + id.toString()) {
invariant( _db );
invariant( _columnFamily );
- invariant( _metadataColumnFamily );
- invariant( _columnFamily != _metadataColumnFamily );
if (_isCapped) {
invariant(_cappedMaxSize > 0);
@@ -76,9 +73,8 @@ namespace mongo {
}
// Get next id
- // XXX not using a Snapshot here
- boost::scoped_ptr<rocksdb::Iterator> iter( db->NewIterator( _readOptions(),
- columnFamily ) );
+ boost::scoped_ptr<rocksdb::Iterator> iter(
+ db->NewIterator(_readOptions(), columnFamily.get()));
iter->SeekToLast();
if (iter->Valid()) {
rocksdb::Slice lastSlice = iter->key();
@@ -94,10 +90,7 @@ namespace mongo {
std::string value;
bool metadataPresent = true;
// XXX not using a Snapshot here
- if (!_db->Get( _readOptions(),
- _metadataColumnFamily,
- rocksdb::Slice( _numRecordsKey ),
- &value ).ok()) {
+ if (!_db->Get(_readOptions(), rocksdb::Slice(_numRecordsKey), &value).ok()) {
_numRecords = 0;
metadataPresent = false;
}
@@ -106,10 +99,7 @@ namespace mongo {
}
// XXX not using a Snapshot here
- if (!_db->Get( _readOptions(),
- _metadataColumnFamily,
- rocksdb::Slice( _dataSizeKey ),
- &value ).ok()) {
+ if (!_db->Get(_readOptions(), rocksdb::Slice(_dataSizeKey), &value).ok()) {
_dataSize = 0;
invariant(!metadataPresent);
}
@@ -124,56 +114,22 @@ namespace mongo {
int infoLevel ) const {
uint64_t storageSize;
rocksdb::Range wholeRange( _makeKey( minDiskLoc ), _makeKey( maxDiskLoc ) );
- _db->GetApproximateSizes( _columnFamily, &wholeRange, 1, &storageSize);
+ _db->GetApproximateSizes(_columnFamily.get(), &wholeRange, 1, &storageSize);
return static_cast<int64_t>( storageSize );
}
RecordData RocksRecordStore::dataFor( OperationContext* txn, const DiskLoc& loc) const {
- rocksdb::Slice value;
-
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
- auto key = _makeKey(loc);
- boost::shared_array<char> data;
-
- boost::scoped_ptr<rocksdb::WBWIIterator> wb_iterator(
- ru->writeBatch()->NewIterator(_columnFamily));
- wb_iterator->Seek(key);
- if (wb_iterator->Valid() && wb_iterator->Entry().key == key) {
- auto& entry = wb_iterator->Entry();
- if (entry.type == rocksdb::WriteType::kDeleteRecord) {
- return RecordData(nullptr, 0);
- }
- data.reset(new char[entry.value.size()]);
- memcpy(data.get(), entry.value.data(), entry.value.size());
- value = rocksdb::Slice(data.get(), entry.value.size());
- } else {
- // TODO investigate using cursor API to get a Slice and avoid double copying.
- std::string value_storage;
- auto status = _db->Get(_readOptions(txn), _columnFamily, _makeKey(loc), &value_storage);
- if (!status.ok()) {
- if (status.IsNotFound()) {
- return RecordData(nullptr, 0);
- } else {
- log() << "rocks Get failed, blowing up: " << status.ToString();
- invariant(false);
- }
- }
- data.reset(new char[value_storage.size()]);
- memcpy(data.get(), value_storage.data(), value_storage.size());
- value = rocksdb::Slice(data.get(), value_storage.size());
- }
-
- return RecordData(value.data(), value.size(), data);
+ return _getDataFor(_db, _columnFamily.get(), txn, loc);
}
void RocksRecordStore::deleteRecord( OperationContext* txn, const DiskLoc& dl ) {
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
std::string oldValue;
- _db->Get( _readOptions( txn ), _columnFamily, _makeKey( dl ), &oldValue );
+ _db->Get(_readOptions(txn), _columnFamily.get(), _makeKey(dl), &oldValue);
int oldLength = oldValue.size();
- ru->writeBatch()->Delete( _columnFamily, _makeKey( dl ) );
+ ru->writeBatch()->Delete(_columnFamily.get(), _makeKey(dl));
_changeNumRecords(txn, false);
_increaseDataSize(txn, -oldLength);
@@ -195,9 +151,8 @@ namespace mongo {
void RocksRecordStore::cappedDeleteAsNeeded(OperationContext* txn) {
if (!cappedAndNeedDelete())
return;
- // This persistent iterator is necessary since you can't read your own writes
- boost::scoped_ptr<rocksdb::Iterator> iter( _db->NewIterator( _readOptions( txn ),
- _columnFamily ) );
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_columnFamily.get()));
iter->SeekToFirst();
// XXX TODO there is a bug here where if the size of the write batch exceeds the cap size
@@ -231,11 +186,11 @@ namespace mongo {
"object to insert exceeds cappedMaxSize" );
}
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
DiskLoc loc = _nextId();
- ru->writeBatch()->Put( _columnFamily, _makeKey( loc ), rocksdb::Slice( data, len ) );
+ ru->writeBatch()->Put(_columnFamily.get(), _makeKey(loc), rocksdb::Slice(data, len));
_changeNumRecords( txn, true );
_increaseDataSize( txn, len );
@@ -261,14 +216,10 @@ namespace mongo {
int len,
bool enforceQuota,
UpdateMoveNotifier* notifier ) {
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
std::string old_value;
- // XXX Be sure to also first query the write batch once Facebook implements that
- rocksdb::Status status = _db->Get( _readOptions( txn ),
- _columnFamily,
- _makeKey( loc ),
- &old_value );
+ auto status = ru->Get(_columnFamily.get(), _makeKey(loc), &old_value);
if ( !status.ok() ) {
return StatusWith<DiskLoc>( ErrorCodes::InternalError, status.ToString() );
@@ -276,7 +227,7 @@ namespace mongo {
int old_length = old_value.size();
- ru->writeBatch()->Put( _columnFamily, _makeKey( loc ), rocksdb::Slice( data, len ) );
+ ru->writeBatch()->Put(_columnFamily.get(), _makeKey(loc), rocksdb::Slice(data, len));
_increaseDataSize(txn, len - old_length);
@@ -289,14 +240,14 @@ namespace mongo {
const DiskLoc& loc,
const char* damangeSource,
const mutablebson::DamageVector& damages ) {
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
rocksdb::Slice key = _makeKey( loc );
// get original value
std::string value;
rocksdb::Status status;
- status = _db->Get( _readOptions( txn ), _columnFamily, key, &value );
+ status = _db->Get(_readOptions(txn), _columnFamily.get(), key, &value);
if ( !status.ok() ) {
if ( status.IsNotFound() )
@@ -316,7 +267,7 @@ namespace mongo {
}
// write back
- ru->writeBatch()->Put( _columnFamily, key, value );
+ ru->writeBatch()->Put(_columnFamily.get(), key, value);
return Status::OK();
}
@@ -326,9 +277,7 @@ namespace mongo {
bool tailable,
const CollectionScanParams::Direction& dir
) const {
- invariant( !tailable );
-
- return new Iterator( txn, this, dir, start );
+ return new Iterator(txn, _db, _columnFamily, tailable, dir, start);
}
@@ -360,7 +309,7 @@ namespace mongo {
RecordStoreCompactAdaptor* adaptor,
const CompactOptions* options,
CompactStats* stats ) {
- rocksdb::Status status = _db->CompactRange( _columnFamily, NULL, NULL );
+ rocksdb::Status status = _db->CompactRange(_columnFamily.get(), NULL, NULL);
if ( status.ok() )
return Status::OK();
else
@@ -380,16 +329,18 @@ namespace mongo {
boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) );
while( !iter->isEOF() ) {
numRecords++;
- RecordData data = dataFor( txn, iter->curr() );
- size_t dataSize;
- const Status status = adaptor->validate( data, &dataSize );
- if (!status.isOK()) {
- results->valid = false;
- if ( invalidObject ) {
- results->errors.push_back("invalid object detected (see logs)");
+ if (full) {
+ RecordData data = dataFor(txn, iter->curr());
+ size_t dataSize;
+ const Status status = adaptor->validate(data, &dataSize);
+ if (!status.isOK()) {
+ results->valid = false;
+ if (invalidObject) {
+ results->errors.push_back("invalid object detected (see logs)");
+ }
+ invalidObject = true;
+ log() << "Invalid object detected in " << _ns << ": " << status.reason();
}
- invalidObject = true;
- log() << "Invalid object detected in " << _ns << ": " << status.reason();
}
iter->getNext();
}
@@ -410,14 +361,25 @@ namespace mongo {
result->appendIntOrLL("max", _cappedMaxDocs);
result->appendIntOrLL("maxSize", _cappedMaxSize);
}
- bool valid = _db->GetProperty( _columnFamily, "rocksdb.stats", &statsString );
+ bool valid = _db->GetProperty(_columnFamily.get(), "rocksdb.stats", &statsString);
invariant( valid );
result->append( "stats", statsString );
}
+ Status RocksRecordStore::touch(OperationContext* txn, BSONObjBuilder* output) const {
+ Timer t;
+ boost::scoped_ptr<rocksdb::Iterator> itr(
+ _db->NewIterator(_readOptions(), _columnFamily.get()));
+ itr->SeekToFirst();
+ for (; itr->Valid(); itr->Next()) {
+ invariant(itr->status().ok());
+ }
+ invariant(itr->status().ok());
- // AFB: is there a way to force column families to be cached in rocks?
- Status RocksRecordStore::touch( OperationContext* txn, BSONObjBuilder* output ) const {
+ if (output) {
+ output->append("numRanges", 1);
+ output->append("millis", t.millis());
+ }
return Status::OK();
}
@@ -429,7 +391,7 @@ namespace mongo {
return Status::OK();
}
- return Status( ErrorCodes::BadValue, "Invalid option: " + optionName );
+ return Status(ErrorCodes::InvalidOptions, "Invalid option: " + optionName);
}
namespace {
@@ -485,19 +447,19 @@ namespace mongo {
}
void RocksRecordStore::dropRsMetaData( OperationContext* opCtx ) {
- RocksRecoveryUnit* ru = _getRecoveryUnit( opCtx );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( opCtx );
boost::mutex::scoped_lock dataSizeLk( _dataSizeLock );
- ru->writeBatch()->Delete( _metadataColumnFamily, _dataSizeKey );
+ ru->writeBatch()->Delete(_dataSizeKey);
boost::mutex::scoped_lock numRecordsLk( _numRecordsLock );
- ru->writeBatch()->Delete( _metadataColumnFamily, _numRecordsKey );
+ ru->writeBatch()->Delete(_numRecordsKey);
}
- rocksdb::ReadOptions RocksRecordStore::_readOptions( OperationContext* opCtx ) const {
+ rocksdb::ReadOptions RocksRecordStore::_readOptions(OperationContext* opCtx) {
rocksdb::ReadOptions options;
if ( opCtx ) {
- options.snapshot = _getRecoveryUnit( opCtx )->snapshot();
+ options.snapshot = RocksRecoveryUnit::getRocksRecoveryUnit( opCtx )->snapshot();
}
return options;
}
@@ -511,18 +473,38 @@ namespace mongo {
return loc;
}
- rocksdb::Slice RocksRecordStore::_makeKey( const DiskLoc& loc ) {
- return rocksdb::Slice( reinterpret_cast<const char*>( &loc ), sizeof( loc ) );
- }
-
- RocksRecoveryUnit* RocksRecordStore::_getRecoveryUnit( OperationContext* opCtx ) {
- return dynamic_cast<RocksRecoveryUnit*>( opCtx->recoveryUnit() );
+ rocksdb::Slice RocksRecordStore::_makeKey(const DiskLoc& loc) {
+ return rocksdb::Slice(reinterpret_cast<const char*>(&loc), sizeof(loc));
}
DiskLoc RocksRecordStore::_makeDiskLoc( const rocksdb::Slice& slice ) {
return reinterpret_cast<const DiskLoc*>( slice.data() )[0];
}
+ RecordData RocksRecordStore::_getDataFor(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf,
+ OperationContext* txn, const DiskLoc& loc) {
+ rocksdb::Slice value;
+
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ auto key = _makeKey(loc);
+ boost::shared_array<char> data;
+
+ std::string value_storage;
+ auto status = ru->Get(cf, _makeKey(loc), &value_storage);
+ if (!status.ok()) {
+ if (status.IsNotFound()) {
+ return RecordData(nullptr, 0);
+ } else {
+ log() << "rocks Get failed, blowing up: " << status.ToString();
+ invariant(false);
+ }
+ }
+ data.reset(new char[value_storage.size()]);
+ memcpy(data.get(), value_storage.data(), value_storage.size());
+ value = rocksdb::Slice(data.get(), value_storage.size());
+ return RecordData(value.data(), value.size(), data);
+ }
+
// XXX make sure these work with rollbacks (I don't think they will)
void RocksRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) {
boost::mutex::scoped_lock lk( _numRecordsLock );
@@ -533,12 +515,11 @@ namespace mongo {
else {
_numRecords--;
}
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
const char* nr_ptr = reinterpret_cast<char*>( &_numRecords );
- ru->writeBatch()->Put( _metadataColumnFamily,
- rocksdb::Slice( _numRecordsKey ),
- rocksdb::Slice( nr_ptr, sizeof(long long) ) );
+ ru->writeBatch()->Put(rocksdb::Slice(_numRecordsKey),
+ rocksdb::Slice(nr_ptr, sizeof(long long)));
}
@@ -546,43 +527,28 @@ namespace mongo {
boost::mutex::scoped_lock lk( _dataSizeLock );
_dataSize += amount;
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
const char* ds_ptr = reinterpret_cast<char*>( &_dataSize );
- ru->writeBatch()->Put( _metadataColumnFamily,
- rocksdb::Slice( _dataSizeKey ),
- rocksdb::Slice( ds_ptr, sizeof(long long) ) );
+ ru->writeBatch()->Put(rocksdb::Slice(_dataSizeKey),
+ rocksdb::Slice(ds_ptr, sizeof(long long)));
}
// --------
- RocksRecordStore::Iterator::Iterator( OperationContext* txn,
- const RocksRecordStore* rs,
- const CollectionScanParams::Direction& dir,
- const DiskLoc& start )
- : _txn( txn ),
- _rs( rs ),
- _dir( dir ),
- _reseekKeyValid( false ),
- // XXX not using a snapshot here
- _iterator( _rs->_db->NewIterator( rs->_readOptions(), rs->_columnFamily ) ) {
- if (start.isNull()) {
- if ( _forward() )
- _iterator->SeekToFirst();
- else
- _iterator->SeekToLast();
- }
- else {
- _iterator->Seek( rs->_makeKey( start ) );
+ RocksRecordStore::Iterator::Iterator(
+ OperationContext* txn, rocksdb::DB* db,
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, bool tailable,
+ const CollectionScanParams::Direction& dir, const DiskLoc& start)
+ : _txn(!tailable ? txn : nullptr),
+ _db(db),
+ _cf(columnFamily),
+ _tailable(tailable),
+ _dir(dir),
+ _eof(true),
+ _iterator(RocksRecoveryUnit::getRocksRecoveryUnit(txn)->NewIterator(_cf.get())) {
- if ( !_forward() && !_iterator->Valid() )
- _iterator->SeekToLast();
- else if ( !_forward() && _iterator->Valid() &&
- _makeDiskLoc( _iterator->key() ) != start )
- _iterator->Prev();
- }
-
- _checkStatus();
+ _locate(start);
}
void RocksRecordStore::Iterator::_checkStatus() {
@@ -592,29 +558,35 @@ namespace mongo {
}
bool RocksRecordStore::Iterator::isEOF() {
- return !_iterator || !_iterator->Valid();
+ return _eof;
}
DiskLoc RocksRecordStore::Iterator::curr() {
- if ( !_iterator->Valid() )
+ if (_eof) {
return DiskLoc();
+ }
- rocksdb::Slice slice = _iterator->key();
- return _makeDiskLoc( slice );
+ return _curr;
}
DiskLoc RocksRecordStore::Iterator::getNext() {
- if ( !_iterator->Valid() ) {
+ if (_eof) {
return DiskLoc();
}
- DiskLoc toReturn = curr();
+ DiskLoc toReturn = _curr;
if ( _forward() )
_iterator->Next();
else
_iterator->Prev();
+ if (_iterator->Valid()) {
+ _curr = _decodeCurr();
+ } else {
+ _eof = true;
+ // we leave _curr as it is on purpose
+ }
return toReturn;
}
@@ -623,36 +595,74 @@ namespace mongo {
}
void RocksRecordStore::Iterator::saveState() {
- if ( !_iterator ) {
- return;
- }
- if ( _iterator->Valid() ) {
- _reseekKey = _iterator->key().ToString();
- _reseekKeyValid = true;
- }
+ _iterator.reset();
}
+ // XXX restoring state with tailable cursor will invalidate the snapshot stored inside of
+ // OperationContext. It is important that while restoring state, nobody else is using the
+ // OperationContext (i.e. only a single restoreState is called on a tailable cursor with a
+ // single OperationContext)
bool RocksRecordStore::Iterator::restoreState(OperationContext* txn) {
+ if (_tailable) {
+ // we want to read new data if the iterator is tailable
+ RocksRecoveryUnit::getRocksRecoveryUnit(txn)->releaseSnapshot();
+ }
_txn = txn;
- if ( !_reseekKeyValid ) {
- _iterator.reset( NULL );
- return true;
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ _iterator.reset(ru->NewIterator(_cf.get()));
+ bool previousEOF = _eof;
+ DiskLoc previousCurr = _curr;
+ _locate(_curr);
+ // if _curr != previousCurr that means that _curr has been deleted, so we don't need to
+ // advance it, because we're already on the next document by Seek-ing
+ if (previousEOF && _curr == previousCurr) {
+ getNext();
}
- _iterator.reset( _rs->_db->NewIterator( _rs->_readOptions(),
- _rs->_columnFamily ) );
- _checkStatus();
- _iterator->Seek( _reseekKey );
- _checkStatus();
- _reseekKeyValid = false;
-
return true;
}
RecordData RocksRecordStore::Iterator::dataFor( const DiskLoc& loc ) const {
- return _rs->dataFor( _txn, loc );
+ return RocksRecordStore::_getDataFor(_db, _cf.get(), _txn, loc);
+ }
+
+ void RocksRecordStore::Iterator::_locate(const DiskLoc& loc) {
+ if (_forward()) {
+ if (loc.isNull()) {
+ _iterator->SeekToFirst();
+ } else {
+ _iterator->Seek(RocksRecordStore::_makeKey(loc));
+ }
+ _checkStatus();
+ } else { // backward iterator
+ if (loc.isNull()) {
+ _iterator->SeekToLast();
+ } else {
+ // lower bound on reverse iterator
+ _iterator->Seek(RocksRecordStore::_makeKey(loc));
+ _checkStatus();
+ if (!_iterator->Valid()) {
+ _iterator->SeekToLast();
+ } else if (_decodeCurr() != loc) {
+ _iterator->Prev();
+ }
+ }
+ _checkStatus();
+ }
+ _eof = !_iterator->Valid();
+ if (_eof) {
+ _curr = loc;
+ } else {
+ _curr = _decodeCurr();
+ }
+ }
+
+ DiskLoc RocksRecordStore::Iterator::_decodeCurr() const {
+ invariant(_iterator && _iterator->Valid());
+ return _makeDiskLoc(_iterator->key());
}
bool RocksRecordStore::Iterator::_forward() const {
return _dir == CollectionScanParams::FORWARD;
}
+
}
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h
index 8c191006a34..45d4f100855 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.h
+++ b/src/mongo/db/storage/rocks/rocks_record_store.h
@@ -29,7 +29,10 @@
* it in the license file.
*/
+#pragma once
+
#include <string>
+#include <memory>
#include <rocksdb/options.h>
@@ -50,14 +53,11 @@ namespace mongo {
class RocksRecordStore : public RecordStore {
public:
- RocksRecordStore( const StringData& ns,
- rocksdb::DB* db,
- rocksdb::ColumnFamilyHandle* columnFamily,
- rocksdb::ColumnFamilyHandle* metadataColumnFamily,
- bool isCapped = false,
- int64_t cappedMaxSize = -1,
- int64_t cappedMaxDocs = -1,
- CappedDocumentDeleteCallback* cappedDeleteCallback = NULL );
+ RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db,
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
+ bool isCapped = false, int64_t cappedMaxSize = -1,
+ int64_t cappedMaxDocs = -1,
+ CappedDocumentDeleteCallback* cappedDeleteCallback = NULL);
virtual ~RocksRecordStore() { }
@@ -153,12 +153,12 @@ namespace mongo {
static rocksdb::Comparator* newRocksCollectionComparator();
private:
+ // NOTE: RecordIterator might outlive the RecordStore
class Iterator : public RecordIterator {
public:
- Iterator( OperationContext* txn,
- const RocksRecordStore* rs,
- const CollectionScanParams::Direction& dir,
- const DiskLoc& start );
+ Iterator(OperationContext* txn, rocksdb::DB* db,
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, bool tailable,
+ const CollectionScanParams::Direction& dir, const DiskLoc& start);
virtual bool isEOF();
virtual DiskLoc curr();
@@ -169,14 +169,18 @@ namespace mongo {
virtual RecordData dataFor( const DiskLoc& loc ) const;
private:
+ void _locate(const DiskLoc& loc);
+ DiskLoc _decodeCurr() const;
bool _forward() const;
void _checkStatus();
OperationContext* _txn;
- const RocksRecordStore* _rs;
+ rocksdb::DB* _db; // not owned
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> _cf;
+ bool _tailable;
CollectionScanParams::Direction _dir;
- bool _reseekKeyValid;
- std::string _reseekKey;
+ bool _eof;
+ DiskLoc _curr;
boost::scoped_ptr<rocksdb::Iterator> _iterator;
};
@@ -184,12 +188,13 @@ namespace mongo {
* Returns a new ReadOptions struct, containing the snapshot held in opCtx, if opCtx is not
* null
*/
- rocksdb::ReadOptions _readOptions( OperationContext* opCtx = NULL ) const;
-
- static RocksRecoveryUnit* _getRecoveryUnit( OperationContext* opCtx );
+ static rocksdb::ReadOptions _readOptions(OperationContext* opCtx = NULL);
static DiskLoc _makeDiskLoc( const rocksdb::Slice& slice );
+ static RecordData _getDataFor(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf,
+ OperationContext* txn, const DiskLoc& loc);
+
DiskLoc _nextId();
bool cappedAndNeedDelete() const;
void cappedDeleteAsNeeded(OperationContext* txn);
@@ -201,8 +206,7 @@ namespace mongo {
void _increaseDataSize(OperationContext* txn, int amount);
rocksdb::DB* _db; // not owned
- rocksdb::ColumnFamilyHandle* _columnFamily; // not owned
- rocksdb::ColumnFamilyHandle* _metadataColumnFamily; // not owned
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily;
const bool _isCapped;
const int64_t _cappedMaxSize;
diff --git a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
index 917a70214b6..95206f3f31f 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
@@ -1,35 +1,35 @@
-// rocks_record_store_test.cpp
+// rocks_record_store_harness_test.cpp
/**
-* Copyright (C) 2014 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#include <memory>
+#include <vector>
#include <boost/filesystem/operations.hpp>
@@ -37,684 +37,47 @@
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
-#include <rocksdb/status.h>
-#include <rocksdb/comparator.h>
-#include <rocksdb/utilities/write_batch_with_index.h>
-#include "mongo/db/operation_context.h"
-#include "mongo/db/operation_context_noop.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
+#include "mongo/db/storage/record_store_test_harness.h"
#include "mongo/db/storage/rocks/rocks_record_store.h"
#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
-#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
-
-using namespace mongo;
+#include "mongo/unittest/temp_dir.h"
namespace mongo {
- class MyOperationContext : public OperationContextNoop {
+ class RocksRecordStoreHarnessHelper : public HarnessHelper {
public:
- MyOperationContext(rocksdb::DB* db)
- : OperationContextNoop(new RocksRecoveryUnit(db, true)) {}
+ RocksRecordStoreHarnessHelper() : _tempDir(_testNamespace) {
+ boost::filesystem::remove_all(_tempDir.path());
+ rocksdb::DB* db;
+ std::vector<rocksdb::ColumnFamilyDescriptor> cfs;
+ cfs.emplace_back();
+ cfs.emplace_back("record_store", rocksdb::ColumnFamilyOptions());
+ cfs[1].options.comparator = RocksRecordStore::newRocksCollectionComparator();
+ rocksdb::DBOptions db_options;
+ db_options.create_if_missing = true;
+ db_options.create_missing_column_families = true;
+ std::vector<rocksdb::ColumnFamilyHandle*> handles;
+ auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db);
+ ASSERT(s.ok());
+ _db.reset(db);
+ delete handles[0];
+ _cf.reset(handles[1]);
+ }
+
+ virtual RecordStore* newNonCappedRecordStore() {
+ return new RocksRecordStore("foo.bar", "1", _db.get(), _cf);
+ }
+
+ virtual RecoveryUnit* newRecoveryUnit() { return new RocksRecoveryUnit(_db.get(), true); }
+
+ private:
+ string _testNamespace = "mongo-rocks-record-store-test";
+ unittest::TempDir _tempDir;
+ boost::scoped_ptr<rocksdb::DB> _db;
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> _cf;
};
- // to be used in testing
- static boost::scoped_ptr<rocksdb::Comparator> _rocksComparator(
- RocksRecordStore::newRocksCollectionComparator() );
-
- rocksdb::ColumnFamilyOptions getColumnFamilyOptions() {
- rocksdb::ColumnFamilyOptions options;
- options.comparator = _rocksComparator.get();
- return options;
- }
-
- // the name of the column family that will be used to back the data in all the record stores
- // created during tests.
- const string columnFamilyName = "myColumnFamily";
-
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> _createCfh(rocksdb::DB* db ) {
-
- rocksdb::ColumnFamilyHandle* cfh;
-
- rocksdb::Status s = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(),
- columnFamilyName,
- &cfh );
-
- invariant( s.ok() );
-
- return boost::shared_ptr<rocksdb::ColumnFamilyHandle>( cfh );
- }
-
- string _rocksRecordStoreTestDir = "mongo-rocks-test";
-
- rocksdb::DB* getDB( string path) {
- boost::filesystem::remove_all( path );
-
- rocksdb::Options options = RocksEngine::dbOptions();
-
- // open DB
- rocksdb::DB* db;
- rocksdb::Status s = rocksdb::DB::Open(options, path, &db);
- ASSERT_OK( toMongoStatus( s ) );
-
- return db;
- }
-
- typedef std::pair<shared_ptr<rocksdb::DB>, shared_ptr<rocksdb::ColumnFamilyHandle> > DbAndCfh;
- DbAndCfh getDBPersist( string path ) {
- // Need to pass a vector with cfd's for every column family, which should just be
- // columnFamilyName (for data) and the rocks default column family (for metadata).
- vector<rocksdb::ColumnFamilyDescriptor> descriptors;
- descriptors.push_back( rocksdb::ColumnFamilyDescriptor() );
- descriptors.push_back( rocksdb::ColumnFamilyDescriptor( columnFamilyName,
- rocksdb::ColumnFamilyOptions() ) );
-
- // open DB
- rocksdb::DB* db;
- rocksdb::Options options = RocksEngine::dbOptions();
- vector<rocksdb::ColumnFamilyHandle*> handles;
- rocksdb::Status s = rocksdb::DB::Open(options, path, descriptors, &handles, &db);
- ASSERT_OK( toMongoStatus( s ) );
-
- // so that the caller of this function has access to the column family handle backing the
- // record store data.
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfhPtr( handles[1] );
-
- return std::make_pair( boost::shared_ptr<rocksdb::DB>( db ), cfhPtr );
- }
-
- TEST( RocksRecoveryUnitTest, Simple1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- db->Put( rocksdb::WriteOptions(), "a", "b" );
-
- string value;
- db->Get( rocksdb::ReadOptions(), "a", &value );
- ASSERT_EQUALS( value, "b" );
-
- {
- RocksRecoveryUnit ru( db.get(), false );
- ru.beginUnitOfWork();
- ru.writeBatch()->Put( "a", "c" );
-
- value = "x";
- db->Get( rocksdb::ReadOptions(), "a", &value );
- ASSERT_EQUALS( value, "b" );
-
- ru.endUnitOfWork();
- ru.commitUnitOfWork();
- value = "x";
- db->Get( rocksdb::ReadOptions(), "a", &value );
- ASSERT_EQUALS( value, "c" );
- }
-
- }
-
- TEST( RocksRecoveryUnitTest, SimpleAbort1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- db->Put( rocksdb::WriteOptions(), "a", "b" );
-
- {
- string value;
- db->Get( rocksdb::ReadOptions(), "a", &value );
- ASSERT_EQUALS( value, "b" );
- }
-
- {
- RocksRecoveryUnit ru( db.get(), false );
- ru.beginUnitOfWork();
- ru.writeBatch()->Put( "a", "c" );
-
- // note: no endUnitOfWork or commitUnitOfWork
- }
-
- {
- string value;
- db->Get( rocksdb::ReadOptions(), "a", &value );
- ASSERT_EQUALS( value, "b" );
- }
- }
-
-
- TEST( RocksRecordStoreTest, Insert1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() );
- int size;
-
- {
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
- string s = "eliot was here";
- size = s.length() + 1;
-
- MyOperationContext opCtx( db.get() );
- DiskLoc loc;
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- }
-
- ASSERT_EQUALS(s, rs.dataFor(&opCtx, loc).data());
- }
-
- {
- MyOperationContext opCtx(db.get());
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
- ASSERT_EQUALS(1, rs.numRecords(&opCtx));
- ASSERT_EQUALS(size, rs.dataSize(&opCtx));
- }
- }
-
- TEST( RocksRecordStoreTest, Delete1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() );
-
- {
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
- string s = "eliot was here";
-
- DiskLoc loc;
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord(&opCtx,
- s.c_str(),
- s.size() + 1,
- -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- }
-
- ASSERT_EQUALS(s, rs.dataFor(&opCtx, loc).data());
- ASSERT_EQUALS(1, rs.numRecords(&opCtx));
- ASSERT_EQUALS(static_cast<long long>(s.length() + 1), rs.dataSize(&opCtx));
- }
-
- {
- MyOperationContext opCtx(db.get());
- ASSERT(rs.dataFor(&opCtx, loc).data() != NULL);
- }
-
- {
- MyOperationContext opCtx( db.get() );
- WriteUnitOfWork uow( &opCtx );
- rs.deleteRecord( &opCtx, loc );
-
- ASSERT_EQUALS(0, rs.numRecords(&opCtx));
- ASSERT_EQUALS(0, rs.dataSize(&opCtx));
- }
- }
- }
-
- TEST( RocksRecordStoreTest, Update1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() );
-
- {
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
- string s1 = "eliot1";
- string s2 = "eliot2 and more";
-
- DiskLoc loc;
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx,
- s1.c_str(),
- s1.size() + 1,
- -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- }
-
- ASSERT_EQUALS(s1, rs.dataFor(&opCtx, loc).data());
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res =
- rs.updateRecord(&opCtx, loc, s2.c_str(), s2.size() + 1, -1, NULL);
- ASSERT_OK( res.getStatus() );
- ASSERT( loc == res.getValue() );
- }
-
- ASSERT_EQUALS(s2, rs.dataFor(&opCtx, loc).data());
- }
-
- }
- }
-
- TEST( RocksRecordStoreTest, UpdateInPlace1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() );
-
- {
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
- string s1 = "aaa111bbb";
- string s2 = "aaa222bbb";
-
- DiskLoc loc;
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx,
- s1.c_str(),
- s1.size() + 1,
- -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- }
-
- ASSERT_EQUALS(s1, rs.dataFor(&opCtx, loc).data());
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- const char* damageSource = "222";
- mutablebson::DamageVector dv;
- dv.push_back( mutablebson::DamageEvent() );
- dv[0].sourceOffset = 0;
- dv[0].targetOffset = 3;
- dv[0].size = 3;
- Status res = rs.updateWithDamages( &opCtx,
- loc,
- damageSource,
- dv );
- ASSERT_OK( res );
- }
- ASSERT_EQUALS(s2, rs.dataFor(&opCtx, loc).data());
- }
-
- }
- }
-
- TEST( RocksRecordStoreTest, TwoCollections ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- rocksdb::ColumnFamilyHandle* cf1;
- rocksdb::ColumnFamilyHandle* cf2;
- rocksdb::ColumnFamilyHandle* cf1_m;
- rocksdb::ColumnFamilyHandle* cf2_m;
-
- rocksdb::Status status;
-
- status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1", &cf1 );
- ASSERT_OK( toMongoStatus( status ) );
- status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar2", &cf2 );
- ASSERT_OK( toMongoStatus( status ) );
-
- status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1&", &cf1_m );
- ASSERT_OK( toMongoStatus( status ) );
- status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar2&", &cf2_m );
- ASSERT_OK( toMongoStatus( status ) );
-
- RocksRecordStore rs1( "foo.bar1", db.get(), cf1, cf1_m );
- RocksRecordStore rs2( "foo.bar2", db.get(), cf2, cf2_m );
-
- DiskLoc a;
- DiskLoc b;
-
- {
- MyOperationContext opCtx( db.get() );
- WriteUnitOfWork uow( &opCtx );
-
- StatusWith<DiskLoc> result = rs1.insertRecord( &opCtx, "a", 2, -1 );
- ASSERT_OK( result.getStatus() );
- a = result.getValue();
-
- result = rs2.insertRecord( &opCtx, "b", 2, -1 );
- ASSERT_OK( result.getStatus() );
- b = result.getValue();
- }
-
- ASSERT_EQUALS( a, b );
-
- {
- MyOperationContext opCtx(db.get());
- ASSERT_EQUALS(string("a"), rs1.dataFor(&opCtx, a).data());
- ASSERT_EQUALS(string("b"), rs2.dataFor(&opCtx, b).data());
- }
-
- delete cf2;
- delete cf1;
- }
-
- TEST( RocksRecordStoreTest, Stats1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() );
-
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
- string s = "eliot was here";
-
- {
- MyOperationContext opCtx( db.get() );
- DiskLoc loc;
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- }
-
- ASSERT_EQUALS(s, rs.dataFor(&opCtx, loc).data());
- }
-
- {
- MyOperationContext opCtx( db.get() );
- BSONObjBuilder b;
- rs.appendCustomStats( &opCtx, &b, 1 );
- BSONObj obj = b.obj();
- ASSERT( obj["stats"].String().find( "WAL" ) != string::npos );
- }
- }
-
- TEST( RocksRecordStoreTest, Persistence1 ) {
- DiskLoc loc;
- string origStr = "eliot was here";
- string newStr = "antonio was here";
- unittest::TempDir td( _rocksRecordStoreTestDir );
-
- {
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() );
-
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, origStr.c_str(),
- origStr.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- }
-
- ASSERT_EQUALS(origStr, rs.dataFor(&opCtx, loc).data());
- }
- }
-
- {
- DbAndCfh dbAndCfh = getDBPersist( td.path() );
- boost::shared_ptr<rocksdb::DB> db = dbAndCfh.first;
-
- RocksRecordStore rs( "foo.bar",
- db.get(),
- dbAndCfh.second.get(),
- db->DefaultColumnFamily() );
-
- {
- MyOperationContext opCtx(db.get());
- ASSERT_EQUALS(static_cast<long long>(origStr.size() + 1), rs.dataSize(&opCtx));
- ASSERT_EQUALS(1, rs.numRecords(&opCtx));
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res =
- rs.updateRecord(&opCtx, loc, newStr.c_str(), newStr.size() + 1, -1, NULL);
- ASSERT_OK( res.getStatus() );
- }
-
- ASSERT_EQUALS(newStr, rs.dataFor(&opCtx, loc).data());
- }
- }
-
- {
- DbAndCfh dbAndCfh = getDBPersist( td.path() );
- boost::shared_ptr<rocksdb::DB> db = dbAndCfh.first;
-
- RocksRecordStore rs( "foo.bar",
- db.get(),
- dbAndCfh.second.get(),
- db->DefaultColumnFamily() );
-
- {
- MyOperationContext opCtx(db.get());
- ASSERT_EQUALS(static_cast<long long>(newStr.size() + 1), rs.dataSize(&opCtx));
- ASSERT_EQUALS(1, rs.numRecords(&opCtx));
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- rs.deleteRecord( &opCtx, loc );
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- ASSERT_EQUALS(0, rs.dataSize(&opCtx));
- ASSERT_EQUALS(0, rs.numRecords(&opCtx));
- }
- }
- }
-
- TEST( RocksRecordStoreTest, ForwardIterator ) {
- {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- rocksdb::ColumnFamilyHandle* cf1;
- rocksdb::ColumnFamilyHandle* cf1_m;
-
- rocksdb::Status status;
-
- status = db->CreateColumnFamily( getColumnFamilyOptions(), "foo.bar1", &cf1 );
- ASSERT_OK( toMongoStatus( status ) );
- status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1&", &cf1_m );
- ASSERT_OK( toMongoStatus( status ) );
-
- RocksRecordStore rs( "foo.bar", db.get(), cf1, cf1_m );
- string s1 = "eliot was here";
- string s2 = "antonio was here";
- string s3 = "eliot and antonio were here";
- DiskLoc loc1;
- DiskLoc loc2;
- DiskLoc loc3;
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s1.c_str(), s1.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc1 = res.getValue();
- res = rs.insertRecord( &opCtx, s2.c_str(), s2.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc2 = res.getValue();
- res = rs.insertRecord( &opCtx, s3.c_str(), s3.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc3 = res.getValue();
- }
- }
-
- MyOperationContext txn(db.get());
-
- scoped_ptr<RecordIterator> iter( rs.getIterator( &txn ) );
-
- ASSERT_EQUALS( false, iter->isEOF() );
- ASSERT_EQUALS( loc1, iter->getNext() );
- ASSERT_EQUALS( s1, iter->dataFor( loc1 ).data() );
-
- ASSERT_EQUALS( false, iter->isEOF() );
- ASSERT_EQUALS( loc2, iter->getNext() );
- ASSERT_EQUALS( s2, iter->dataFor( loc2 ).data() );
-
- ASSERT_EQUALS( false, iter->isEOF() );
- ASSERT_EQUALS( loc3, iter->getNext() );
- ASSERT_EQUALS( s3, iter->dataFor( loc3 ).data() );
-
- ASSERT_EQUALS( true, iter->isEOF() );
- ASSERT_EQUALS( DiskLoc(), iter->getNext() );
- }
- }
-
- TEST( RocksRecordStoreTest, BackwardIterator ) {
- {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- rocksdb::ColumnFamilyHandle* cf1;
- rocksdb::ColumnFamilyHandle* cf1_m;
-
- rocksdb::Status status;
-
- status = db->CreateColumnFamily( getColumnFamilyOptions(), "foo.bar1", &cf1 );
- ASSERT_OK( toMongoStatus( status ) );
- status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1&", &cf1_m );
- ASSERT_OK( toMongoStatus( status ) );
-
- RocksRecordStore rs( "foo.bar", db.get(), cf1, cf1_m );
- string s1 = "eliot was here";
- string s2 = "antonio was here";
- string s3 = "eliot and antonio were here";
- DiskLoc loc1;
- DiskLoc loc2;
- DiskLoc loc3;
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s1.c_str(), s1.size() +1, -1 );
- ASSERT_OK( res.getStatus() );
- loc1 = res.getValue();
- res = rs.insertRecord( &opCtx, s2.c_str(), s2.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc2 = res.getValue();
- res = rs.insertRecord( &opCtx, s3.c_str(), s3.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc3 = res.getValue();
- }
- }
-
- MyOperationContext txn(db.get());
- scoped_ptr<RecordIterator> iter( rs.getIterator( &txn, maxDiskLoc, false,
- CollectionScanParams::BACKWARD ) );
- ASSERT_EQUALS( false, iter->isEOF() );
- ASSERT_EQUALS( loc3, iter->getNext() );
- ASSERT_EQUALS( s3, iter->dataFor( loc3 ).data() );
-
- ASSERT_EQUALS( false, iter->isEOF() );
- ASSERT_EQUALS( loc2, iter->getNext() );
- ASSERT_EQUALS( s2, iter->dataFor( loc2 ).data() );
-
- ASSERT_EQUALS( false, iter->isEOF() );
- ASSERT_EQUALS( loc1, iter->getNext() );
- ASSERT_EQUALS( s1, iter->dataFor( loc1 ).data() );
-
- ASSERT_EQUALS( true, iter->isEOF() );
- ASSERT_EQUALS( DiskLoc(), iter->getNext() );
- }
- }
-
- TEST( RocksRecordStoreTest, Truncate1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- rocksdb::ColumnFamilyHandle* cf1;
- rocksdb::ColumnFamilyHandle* cf1_m;
-
- rocksdb::Status status;
-
- status = db->CreateColumnFamily( getColumnFamilyOptions(), "foo.bar1", &cf1 );
- ASSERT_OK( toMongoStatus( status ) );
- status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1&", &cf1_m );
- ASSERT_OK( toMongoStatus( status ) );
-
- RocksRecordStore rs( "foo.bar", db.get(), cf1, cf1_m );
- string s = "antonio was here";
-
- {
- MyOperationContext opCtx( db.get() );
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- }
-
- {
- MyOperationContext opCtx( db.get() );
- WriteUnitOfWork uow( &opCtx );
- Status stat = rs.truncate( &opCtx );
- ASSERT_OK( stat );
-
- ASSERT_EQUALS(0, rs.numRecords(&opCtx));
- ASSERT_EQUALS(0, rs.dataSize(&opCtx));
- }
-
- // Test that truncate does not fail on an empty collection
- {
- MyOperationContext opCtx( db.get() );
- WriteUnitOfWork uow( &opCtx );
- Status stat = rs.truncate( &opCtx );
- ASSERT_OK( stat );
-
- ASSERT_EQUALS(0, rs.numRecords(&opCtx));
- ASSERT_EQUALS(0, rs.dataSize(&opCtx));
- }
- }
- }
-
- TEST( RocksRecordStoreTest, Snapshots1 ) {
- unittest::TempDir td( _rocksRecordStoreTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() );
-
- DiskLoc loc;
- int size = -1;
-
- {
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
- string s = "test string";
- size = s.length() + 1;
-
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 );
- ASSERT_OK( res.getStatus() );
- loc = res.getValue();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- MyOperationContext opCtx2( db.get() );
-
- RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() );
-
- rs.deleteRecord( &opCtx, loc );
-
- RecordData recData = rs.dataFor(&opCtx, loc /*, &opCtx */);
- ASSERT( !recData.data() && recData.size() == 0 );
-
- RecordData recData2 = rs.dataFor(&opCtx2, loc);
- ASSERT(recData2.data() && recData2.size() == size);
- }
- }
+ HarnessHelper* newHarnessHelper() { return new RocksRecordStoreHarnessHelper(); }
}
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
index 46bf25d7d89..26502935c93 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
@@ -32,11 +32,13 @@
#include <rocksdb/comparator.h>
#include <rocksdb/db.h>
+#include <rocksdb/iterator.h>
#include <rocksdb/slice.h>
#include <rocksdb/options.h>
#include <rocksdb/write_batch.h>
#include <rocksdb/utilities/write_batch_with_index.h>
+#include "mongo/db/operation_context.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -50,7 +52,7 @@ namespace mongo {
RocksRecoveryUnit::~RocksRecoveryUnit() {
if (!_destroyed) {
- _destroyInternal();
+ destroy();
}
}
@@ -105,7 +107,8 @@ namespace mongo {
if (!_writeBatch) {
// this assumes that default column family uses default comparator. change this if you
// change default column family's comparator
- _writeBatch.reset(new rocksdb::WriteBatchWithIndex(rocksdb::BytewiseComparator()));
+ _writeBatch.reset(
+ new rocksdb::WriteBatchWithIndex(rocksdb::BytewiseComparator(), 0, true));
}
return _writeBatch.get();
@@ -114,8 +117,19 @@ namespace mongo {
void RocksRecoveryUnit::registerChange(Change* change) { _changes.emplace_back(change); }
void RocksRecoveryUnit::destroy() {
+ if (_defaultCommit) {
+ commitUnitOfWork();
+ }
+
+ if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) {
+ for (auto& change : _changes) {
+ change->rollback();
+ delete change;
+ }
+ }
+
+ releaseSnapshot();
_destroyed = true;
- _destroyInternal();
}
// XXX lazily initialized for now
@@ -131,18 +145,46 @@ namespace mongo {
return _snapshot;
}
- void RocksRecoveryUnit::_destroyInternal() {
- if (_defaultCommit) {
- commitUnitOfWork();
+ void RocksRecoveryUnit::releaseSnapshot() {
+ if (_snapshot) {
+ _db->ReleaseSnapshot(_snapshot);
+ _snapshot = nullptr;
}
+ }
- for (auto& change : _changes) {
- change->rollback();
- delete change;
+ rocksdb::Status RocksRecoveryUnit::Get(rocksdb::ColumnFamilyHandle* columnFamily,
+ const rocksdb::Slice& key, std::string* value) {
+ if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) {
+ boost::scoped_ptr<rocksdb::WBWIIterator> wb_iterator(
+ _writeBatch->NewIterator(columnFamily));
+ wb_iterator->Seek(key);
+ if (wb_iterator->Valid() && wb_iterator->Entry().key == key) {
+ const auto& entry = wb_iterator->Entry();
+ if (entry.type == rocksdb::WriteType::kDeleteRecord) {
+ return rocksdb::Status::NotFound();
+ }
+ // TODO avoid double copy
+ *value = std::string(entry.value.data(), entry.value.size());
+ return rocksdb::Status::OK();
+ }
}
+ rocksdb::ReadOptions options;
+ options.snapshot = snapshot();
+ return _db->Get(options, columnFamily, key, value);
+ }
- if (_snapshot) {
- _db->ReleaseSnapshot(_snapshot);
+ rocksdb::Iterator* RocksRecoveryUnit::NewIterator(rocksdb::ColumnFamilyHandle* columnFamily) {
+ rocksdb::ReadOptions options;
+ options.snapshot = snapshot();
+ auto iterator = _db->NewIterator(options, columnFamily);
+ if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) {
+ iterator = _writeBatch->NewIteratorWithBase(columnFamily, iterator);
}
+ return iterator;
+ }
+
+ RocksRecoveryUnit* RocksRecoveryUnit::getRocksRecoveryUnit(OperationContext* opCtx) {
+ return dynamic_cast<RocksRecoveryUnit*>(opCtx->recoveryUnit());
}
+
}
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h
index fd6499a5a1a..6a87b3417b6 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h
@@ -46,14 +46,20 @@ namespace rocksdb {
class Snapshot;
class WriteBatchWithIndex;
class Comparator;
+ struct Status;
+ class ColumnFamilyHandle;
+ class Slice;
+ class Iterator;
}
namespace mongo {
+ class OperationContext;
+
class RocksRecoveryUnit : public RecoveryUnit {
MONGO_DISALLOW_COPYING(RocksRecoveryUnit);
public:
- RocksRecoveryUnit(rocksdb::DB* db, bool defaultCommit);
+ RocksRecoveryUnit(rocksdb::DB* db, bool defaultCommit = false);
virtual ~RocksRecoveryUnit();
virtual void beginUnitOfWork();
@@ -79,6 +85,16 @@ namespace mongo {
const rocksdb::Snapshot* snapshot();
+ // to support tailable cursors
+ void releaseSnapshot();
+
+ rocksdb::Status Get(rocksdb::ColumnFamilyHandle* columnFamily, const rocksdb::Slice& key,
+ std::string* value);
+
+ rocksdb::Iterator* NewIterator(rocksdb::ColumnFamilyHandle* columnFamily);
+
+ static RocksRecoveryUnit* getRocksRecoveryUnit(OperationContext* opCtx);
+
private:
void _destroyInternal();
diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp
index 133e4a0230a..6e4c6ee21b3 100644
--- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp
+++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp
@@ -93,9 +93,6 @@ namespace mongo {
return IndexKeyEntry( key, loc );
}
- /**
- * Creates an error code message out of a key
- */
string dupKeyError(const BSONObj& key) {
stringstream ss;
ss << "E11000 duplicate key error ";
@@ -109,11 +106,17 @@ namespace mongo {
*/
class RocksCursor : public SortedDataInterface::Cursor {
public:
- RocksCursor( rocksdb::Iterator* iterator, bool forward, Ordering o )
- : _iterator( iterator ), _forward( forward ), _isCached( false ), _comparator( o ) {
+ RocksCursor(OperationContext* txn, rocksdb::DB* db,
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, bool forward, Ordering o)
+ : _db(db),
+ _columnFamily(columnFamily),
+ _forward(forward),
+ _isCached(false),
+ _comparator(o) {
+ _resetIterator(txn);
// TODO: maybe don't seek until we know we need to?
- if ( _forward )
+ if (_forward)
_iterator->SeekToFirst();
else
_iterator->SeekToLast();
@@ -147,7 +150,11 @@ namespace mongo {
}
bool locate(const BSONObj& key, const DiskLoc& loc) {
- return _locate( stripFieldNames( key ), loc );
+ if (_forward) {
+ return _locate(stripFieldNames(key), loc);
+ } else {
+ return _reverseLocate(stripFieldNames(key), loc);
+ }
}
// same first five args as IndexEntryComparison::makeQueryObject (which is commented).
@@ -165,7 +172,11 @@ namespace mongo {
keyEndInclusive,
getDirection() );
- _locate( key, DiskLoc() );
+ if (_forward) {
+ _locate(key, minDiskLoc);
+ } else {
+ _reverseLocate(key, maxDiskLoc);
+ }
}
/**
@@ -213,6 +224,7 @@ namespace mongo {
}
void restorePosition(OperationContext* txn) {
+ _resetIterator(txn);
_isCached = false;
if ( _savedAtEnd ) {
@@ -236,6 +248,12 @@ namespace mongo {
}
private:
+ void _resetIterator(OperationContext* txn) {
+ invariant(txn);
+ invariant(_db);
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ _iterator.reset(ru->NewIterator(_columnFamily.get()));
+ }
// _locate() for reverse iterators
bool _reverseLocate( const BSONObj& key, const DiskLoc loc ) {
@@ -276,9 +294,7 @@ namespace mongo {
* performing the actual locate logic.
*/
bool _locate( const BSONObj& key, const DiskLoc loc ) {
- if ( !_forward ) {
- return _reverseLocate( key, loc );
- }
+ invariant(_forward);
_isCached = false;
// assumes fieldNames already stripped if necessary
@@ -317,6 +333,8 @@ namespace mongo {
_cachedKey.objsize() );
}
+ rocksdb::DB* _db; // not owned
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily;
scoped_ptr<rocksdb::Iterator> _iterator;
const bool _forward;
@@ -342,7 +360,10 @@ namespace mongo {
public:
RocksIndexEntryComparator( const Ordering& order ): _indexComparator( order ) { }
- virtual int Compare( const rocksdb::Slice& a, const rocksdb::Slice& b ) const {
+ virtual int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const {
+ if (a.size() == 0 || b.size() == 0) {
+ return a.size() == b.size() ? 0 : ((a.size() == 0) ? -1 : 1);
+ }
const IndexKeyEntry lhs = makeIndexKeyEntry( a );
const IndexKeyEntry rhs = makeIndexKeyEntry( b );
return _indexComparator.compare( lhs, rhs );
@@ -366,7 +387,7 @@ namespace mongo {
class WriteBufferCopyIntoHandler : public rocksdb::WriteBatch::Handler {
public:
- WriteBufferCopyIntoHandler(rocksdb::WriteBatchWithIndex* outWriteBatch,
+ WriteBufferCopyIntoHandler(rocksdb::WriteBatch* outWriteBatch,
rocksdb::ColumnFamilyHandle* columnFamily) :
_OutWriteBatch(outWriteBatch),
_columnFamily(columnFamily) { }
@@ -379,82 +400,30 @@ namespace mongo {
}
private:
- rocksdb::WriteBatchWithIndex* _OutWriteBatch;
+ rocksdb::WriteBatch* _OutWriteBatch;
rocksdb::ColumnFamilyHandle* _columnFamily;
};
class RocksBulkSortedBuilderImpl : public SortedDataBuilderInterface {
public:
- RocksBulkSortedBuilderImpl(RocksSortedDataImpl* data,
- rocksdb::ColumnFamilyHandle* columnFamily,
- const Ordering& order,
- OperationContext* txn,
+ RocksBulkSortedBuilderImpl(RocksSortedDataImpl* index, OperationContext* txn,
bool dupsAllowed)
- : _comparator(order),
- _writeBatch(&_comparator),
- _data(data),
- _columnFamily(columnFamily),
- _txn(txn),
- _dupsAllowed(dupsAllowed) { }
+ : _index(index), _txn(txn), _dupsAllowed(dupsAllowed) {
+ invariant(index->isEmpty(txn));
+ }
Status addKey(const BSONObj& key, const DiskLoc& loc) {
- // inserts should be in ascending order.
-
- if (key.objsize() >= kTempKeyMaxSize) {
- return Status(ErrorCodes::KeyTooLong, "key too big");
- }
-
- invariant(!loc.isNull());
- invariant(loc.isValid());
-
- // TODO: How to check "invariant(!hasFieldNames(key));" ?
- if (!_dupsAllowed) {
- // TODO need key locking to support unique indexes.
- Status status = _data->dupKeyCheck(_txn, key, loc);
- if (!status.isOK()) {
- return status;
- }
-
- // Check uniqueness with previous insert to the same bulk
- boost::scoped_ptr<rocksdb::WBWIIterator> wbwi(
- _writeBatch.NewIterator(_columnFamily));
- wbwi->Seek(makeString(key, DiskLoc()));
- invariant(wbwi->status().ok());
- bool isDup = false;
- // TODO can I assume loc never decreases?
- while (wbwi->Valid()) {
- IndexKeyEntry ike = makeIndexKeyEntry(wbwi->Entry().key);
- if (ike.key == key) {
- if (ike.loc != loc) {
- isDup = true;
- }
- wbwi->Next();
- invariant(wbwi->status().ok());
- }
- else
- break;
- }
-
- if (isDup)
- return Status(ErrorCodes::DuplicateKey, dupKeyError(key));
- }
-
- _writeBatch.Put(_columnFamily, makeString(key, loc), rocksdb::Slice());
-
- return Status::OK();
+ // TODO maybe optimize based on a fact that index is empty?
+ return _index->insert(_txn, key, loc, _dupsAllowed);
}
void commit(bool mayInterrupt) {
- RocksRecoveryUnit* ru = dynamic_cast<RocksRecoveryUnit*>(_txn->recoveryUnit());
- WriteBufferCopyIntoHandler copy_handler(ru->writeBatch(), _columnFamily);
- _writeBatch.GetWriteBatch()->Iterate(&copy_handler);
+ WriteUnitOfWork uow(_txn);
+ uow.commit();
}
private:
- RocksIndexEntryComparator _comparator;
- rocksdb::WriteBatchWithIndex _writeBatch;
- RocksSortedDataImpl* _data;
- rocksdb::ColumnFamilyHandle* _columnFamily;
+ RocksSortedDataImpl* _index;
OperationContext* _txn;
bool _dupsAllowed;
};
@@ -463,20 +432,21 @@ namespace mongo {
// RocksSortedDataImpl***********
- RocksSortedDataImpl::RocksSortedDataImpl( rocksdb::DB* db,
- rocksdb::ColumnFamilyHandle* cf,
- Ordering order ) : _db( db ), _columnFamily( cf ), _order( order ) {
+ RocksSortedDataImpl::RocksSortedDataImpl(rocksdb::DB* db,
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf,
+ Ordering order)
+ : _db(db), _columnFamily(cf), _order(order) {
invariant( _db );
- invariant( _columnFamily );
+ invariant(_columnFamily.get());
string value;
- bool ok = _db->GetProperty(_columnFamily, "rocksdb.estimate-num-keys", &value);
+ bool ok = _db->GetProperty(_columnFamily.get(), "rocksdb.estimate-num-keys", &value);
invariant( ok );
_numEntries.store(std::atoll(value.c_str()));
}
SortedDataBuilderInterface* RocksSortedDataImpl::getBulkBuilder(OperationContext* txn,
bool dupsAllowed) {
- return new RocksBulkSortedBuilderImpl(this, _columnFamily, _order, txn, dupsAllowed);
+ return new RocksBulkSortedBuilderImpl(this, txn, dupsAllowed);
}
Status RocksSortedDataImpl::insert(OperationContext* txn,
@@ -486,12 +456,12 @@ namespace mongo {
if (key.objsize() >= kTempKeyMaxSize) {
string msg = mongoutils::str::stream()
- << "Heap1Btree::insert: key too large to index, failing "
- << ' ' << key.objsize() << ' ' << key;
+ << "RocksSortedDataImpl::insert: key too large to index, failing " << ' '
+ << key.objsize() << ' ' << key;
return Status(ErrorCodes::KeyTooLong, msg);
}
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
if ( !dupsAllowed ) {
// TODO need key locking to support unique indexes.
@@ -502,7 +472,7 @@ namespace mongo {
}
ru->registerChange(new ChangeNumEntries(&_numEntries, true));
- ru->writeBatch()->Put( _columnFamily, makeString( key, loc ), emptyByteSlice );
+ ru->writeBatch()->Put(_columnFamily.get(), makeString(key, loc), emptyByteSlice);
return Status::OK();
}
@@ -510,18 +480,17 @@ namespace mongo {
bool RocksSortedDataImpl::unindex(OperationContext* txn,
const BSONObj& key,
const DiskLoc& loc) {
- RocksRecoveryUnit* ru = _getRecoveryUnit( txn );
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
const string keyData = makeString( key, loc );
string dummy;
- const rocksdb::ReadOptions options = RocksEngine::readOptionsWithSnapshot( txn );
- if ( _db->Get( options,_columnFamily, keyData, &dummy ).IsNotFound() ) {
+ if (ru->Get(_columnFamily.get(), keyData, &dummy).IsNotFound()) {
return false;
}
ru->registerChange(new ChangeNumEntries(&_numEntries, false));
- ru->writeBatch()->Delete( _columnFamily, keyData );
+ ru->writeBatch()->Delete(_columnFamily.get(), keyData);
return true;
}
@@ -531,7 +500,7 @@ namespace mongo {
boost::scoped_ptr<SortedDataInterface::Cursor> cursor(newCursor(txn, 1));
cursor->locate(key, DiskLoc(0, 0));
- if (cursor->isEOF() || cursor->getKey() != key) {
+ if (cursor->isEOF() || cursor->getKey() != key || cursor->getDiskLoc() == loc) {
return Status::OK();
} else {
return Status(ErrorCodes::DuplicateKey, dupKeyError(key));
@@ -541,8 +510,8 @@ namespace mongo {
void RocksSortedDataImpl::fullValidate(OperationContext* txn, long long* numKeysOut) const {
if (numKeysOut) {
*numKeysOut = 0;
- const rocksdb::ReadOptions options = RocksEngine::readOptionsWithSnapshot(txn);
- boost::scoped_ptr<rocksdb::Iterator> it(_db->NewIterator(options, _columnFamily));
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ boost::scoped_ptr<rocksdb::Iterator> it(ru->NewIterator(_columnFamily.get()));
it->SeekToFirst();
for (*numKeysOut = 0; it->Valid(); it->Next()) {
++(*numKeysOut);
@@ -550,17 +519,23 @@ namespace mongo {
}
}
- bool RocksSortedDataImpl::isEmpty( OperationContext* txn ) {
- // XXX doesn't use snapshot
- boost::scoped_ptr<rocksdb::Iterator> it( _db->NewIterator( rocksdb::ReadOptions(),
- _columnFamily ) );
+ bool RocksSortedDataImpl::isEmpty(OperationContext* txn) {
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ boost::scoped_ptr<rocksdb::Iterator> it(ru->NewIterator(_columnFamily.get()));
it->SeekToFirst();
return !it->Valid();
}
Status RocksSortedDataImpl::touch(OperationContext* txn) const {
- // no-op
+ boost::scoped_ptr<rocksdb::Iterator> itr;
+ // no need to use snapshot to load into memory
+ itr.reset(_db->NewIterator(rocksdb::ReadOptions(), _columnFamily.get()));
+ itr->SeekToFirst();
+ for (; itr->Valid(); itr->Next()) {
+ invariant(itr->status().ok());
+ }
+
return Status::OK();
}
@@ -570,8 +545,7 @@ namespace mongo {
int direction) const {
invariant( ( direction == 1 || direction == -1 ) && "invalid value for direction" );
rocksdb::ReadOptions options = RocksEngine::readOptionsWithSnapshot( txn );
- return new RocksCursor(
- _db->NewIterator( options, _columnFamily ), direction == 1, _order );
+ return new RocksCursor(txn, _db, _columnFamily, direction == 1, _order);
}
Status RocksSortedDataImpl::initAsEmpty(OperationContext* txn) {
@@ -580,8 +554,9 @@ namespace mongo {
}
long long RocksSortedDataImpl::getSpaceUsedBytes( OperationContext* txn ) const {
- boost::scoped_ptr<rocksdb::Iterator> iter( _db->NewIterator( rocksdb::ReadOptions(),
- _columnFamily ) );
+ // no need to use snapshot here
+ boost::scoped_ptr<rocksdb::Iterator> iter(
+ _db->NewIterator(rocksdb::ReadOptions(), _columnFamily.get()));
iter->SeekToFirst();
const rocksdb::Slice rangeStart = iter->key();
iter->SeekToLast();
@@ -594,7 +569,7 @@ namespace mongo {
// TODO Rocks specifies that this may not include recently written data. Figure out if
// that's okay
- _db->GetApproximateSizes( _columnFamily, &fullIndexRange, 1, &spacedUsedBytes );
+ _db->GetApproximateSizes(_columnFamily.get(), &fullIndexRange, 1, &spacedUsedBytes);
return spacedUsedBytes;
}
@@ -604,8 +579,4 @@ namespace mongo {
return new RocksIndexEntryComparator( order );
}
- RocksRecoveryUnit* RocksSortedDataImpl::_getRecoveryUnit( OperationContext* opCtx ) const {
- return dynamic_cast<RocksRecoveryUnit*>( opCtx->recoveryUnit() );
- }
-
}
diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h
index 2e14b3c62e7..405531e04a2 100644
--- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h
+++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h
@@ -64,7 +64,8 @@ namespace mongo {
class RocksSortedDataImpl : public SortedDataInterface {
MONGO_DISALLOW_COPYING( RocksSortedDataImpl );
public:
- RocksSortedDataImpl( rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, Ordering order );
+ RocksSortedDataImpl(rocksdb::DB* db, boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf,
+ Ordering order);
virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed);
@@ -100,13 +101,11 @@ namespace mongo {
private:
typedef DiskLoc RecordId;
- RocksRecoveryUnit* _getRecoveryUnit( OperationContext* opCtx ) const;
-
rocksdb::DB* _db; // not owned
// Each index is stored as a single column family, so this stores the handle to the
// relevant column family
- rocksdb::ColumnFamilyHandle* _columnFamily; // not owned
+ boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily;
// used to construct RocksCursors
const Ordering _order;
diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_harness_test.cpp b/src/mongo/db/storage/rocks/rocks_sorted_data_impl_harness_test.cpp
deleted file mode 100644
index ef148fa8ab6..00000000000
--- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_harness_test.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Copyright (C) 2014 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include <boost/shared_ptr.hpp>
-#include <boost/filesystem/operations.hpp>
-
-#include <rocksdb/comparator.h>
-#include <rocksdb/db.h>
-#include <rocksdb/options.h>
-#include <rocksdb/slice.h>
-
-#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/sorted_data_interface_test_harness.h"
-#include "mongo/unittest/temp_dir.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
-
-namespace mongo {
-
- class RocksHarnessHelper : public HarnessHelper {
- public:
- RocksHarnessHelper() : _order(Ordering::make(BSONObj())), _tempDir(_testNamespace) {
- boost::filesystem::remove_all(_tempDir.path());
- rocksdb::DB* db;
- rocksdb::Status s = rocksdb::DB::Open(RocksEngine::dbOptions(), _tempDir.path(), &db);
- ASSERT(s.ok());
- _db.reset(db);
- }
-
- virtual SortedDataInterface* newSortedDataInterface() {
- return new RocksSortedDataImpl(_db.get(), _db->DefaultColumnFamily(), _order);
- }
-
- virtual RecoveryUnit* newRecoveryUnit() { return new RocksRecoveryUnit(_db.get(), false); }
-
- private:
- Ordering _order;
- string _testNamespace = "mongo-rocks-sorted-data-test";
- unittest::TempDir _tempDir;
- scoped_ptr<rocksdb::DB> _db;
- };
-
- HarnessHelper* newHarnessHelper() { return new RocksHarnessHelper(); }
-}
diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp b/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp
index 78628a668cc..f629ef5925c 100644
--- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp
@@ -1,35 +1,30 @@
-// rocks_sorted_data_impl_test.cpp
-
/**
-* Copyright (C) 2014 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
-
-#include <memory>
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#include <boost/shared_ptr.hpp>
#include <boost/filesystem/operations.hpp>
@@ -39,889 +34,45 @@
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
-#include "mongo/db/operation_context_noop.h"
#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h"
-#include "mongo/db/storage/rocks/rocks_record_store.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
+#include "mongo/db/storage/sorted_data_interface_test_harness.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
-
-using namespace mongo;
+#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h"
+#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
namespace mongo {
- class MyOperationContext : public OperationContextNoop {
+ class RocksSortedDataImplHarness : public HarnessHelper {
public:
- MyOperationContext( rocksdb::DB* db )
- : OperationContextNoop( new RocksRecoveryUnit( db, false ) ) {
- }
+ RocksSortedDataImplHarness() : _order(Ordering::make(BSONObj())), _tempDir(_testNamespace) {
+ boost::filesystem::remove_all(_tempDir.path());
+ rocksdb::DB* db;
+ std::vector<rocksdb::ColumnFamilyDescriptor> cfs;
+ cfs.emplace_back();
+ cfs[0].options.comparator = RocksSortedDataImpl::newRocksComparator(_order);
+ rocksdb::DBOptions db_options;
+ db_options.create_if_missing = true;
+ std::vector<rocksdb::ColumnFamilyHandle*> handles;
+ auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db);
+ ASSERT(s.ok());
+ _db.reset(db);
+ _cf.reset(handles[0]);
+ }
+
+ virtual SortedDataInterface* newSortedDataInterface() {
+ return new RocksSortedDataImpl(_db.get(), _cf, _order);
+ }
+
+ virtual RecoveryUnit* newRecoveryUnit() { return new RocksRecoveryUnit(_db.get()); }
+
+ private:
+ Ordering _order;
+ string _testNamespace = "mongo-rocks-sorted-data-test";
+ unittest::TempDir _tempDir;
+ scoped_ptr<rocksdb::DB> _db;
+ shared_ptr<rocksdb::ColumnFamilyHandle> _cf;
};
- // to be used in testing
- static std::unique_ptr<rocksdb::Comparator> _rocksComparator(
- RocksSortedDataImpl::newRocksComparator( Ordering::make( BSON( "a" << 1 ) ) ) );
-
- string _rocksSortedDataTestDir = "mongo-rocks-test";
-
- rocksdb::DB* getDB( string path ) {
- boost::filesystem::remove_all( path );
-
- rocksdb::Options options = RocksEngine::dbOptions();
-
- // open DB
- rocksdb::DB* db;
- rocksdb::Status s = rocksdb::DB::Open(options, path, &db);
- ASSERT(s.ok());
-
- return db;
- }
-
- const Ordering dummyOrdering = Ordering::make( BSONObj() );
-
- TEST( RocksSortedDataTest, BrainDead ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- BSONObj key = BSON( "" << 1 );
- DiskLoc loc( 5, 16 );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT( !sortedData.unindex( &opCtx, key, loc ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- Status res = sortedData.insert( &opCtx, key, loc, true );
- ASSERT_OK( res );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT( sortedData.unindex( &opCtx, key, loc ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- sortedData.unindex( &opCtx, key, loc );
- uow.commit();
- }
- }
-
- }
- }
-
- TEST( RocksSortedDataTest, Locate1 ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- BSONObj key = BSON( "" << 1 );
- DiskLoc loc( 5, 16 );
-
- {
-
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT( !cursor->locate( key, loc ) );
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- Status res = sortedData.insert( &opCtx, key, loc, true );
- ASSERT_OK( res );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT( cursor->locate( key, loc ) );
- ASSERT_EQUALS( key, cursor->getKey() );
- ASSERT_EQUALS( loc, cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, Locate2 ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT( !cursor->locate( BSON( "a" << 2 ), DiskLoc(0,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- cursor->advance();
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
-
- cursor->advance();
- ASSERT( cursor->isEOF() );
- }
- }
- }
-
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> makeColumnFamily( rocksdb::DB* db ) {
- rocksdb::ColumnFamilyOptions options;
- options.comparator = _rocksComparator.get();
-
- rocksdb::ColumnFamilyHandle* cfh;
- rocksdb::Status s = db->CreateColumnFamily( options, "simpleColumnFamily", &cfh );
- ASSERT( s.ok() );
-
- return boost::shared_ptr<rocksdb::ColumnFamilyHandle>( cfh );
- }
-
- TEST( RocksSortedDataTest, LocateInexact ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = makeColumnFamily( db.get() );
-
- RocksSortedDataImpl sortedData( db.get(), cfh.get(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT_FALSE( cursor->locate( BSON( "a" << 2 ), DiskLoc(0,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, Snapshots ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
-
- // get a cursor
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
-
- // insert some more stuff
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
-
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
- cursor->advance();
-
- // make sure that the cursor can't "see" anything added after it was created.
- ASSERT( cursor-> isEOF() );
- ASSERT_FALSE( cursor->locate( BSON( "" << 3 ), DiskLoc(1,3) ) );
- ASSERT( cursor->isEOF() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionSimple ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT( !cursor->locate( BSON( "a" << 1 ), DiskLoc(0,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- // restore position
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
-
- // repeat, with a different value
- ASSERT( !cursor->locate( BSON( "a" << 2 ), DiskLoc(0,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- // restore position
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionEOF ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT( !cursor->locate( BSON( "a" << 1 ), DiskLoc(0,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
-
- // advance to the end
- while ( !cursor->isEOF() ) {
- cursor->advance();
- }
-
- ASSERT( cursor->isEOF() );
-
- // save the position
- cursor->savePosition();
-
- // restore position, make sure we're at the end
- cursor->restorePosition( &opCtx );
- ASSERT( cursor->isEOF() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionInsert ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT( !cursor->locate( BSON( "" << 3 ), DiskLoc(0,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT_OK(
- sortedData.insert( &opCtx, BSON( "" << 4 ), DiskLoc(1,4), true ) );
- uow.commit();
- }
- }
-
- // restore position, make sure we don't see the newly inserted value
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
-
- cursor->advance();
- ASSERT( cursor->isEOF() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionDelete2 ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT( !cursor->locate( BSON( "" << 2 ), DiskLoc(0,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT( sortedData.unindex( &opCtx, BSON( "" << 1 ), DiskLoc(1,1) ) );
- uow.commit();
- }
- }
-
- // restore position
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionDelete3 ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) );
- ASSERT( !cursor->locate( BSON( "" << 2 ), DiskLoc(0,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT( sortedData.unindex( &opCtx, BSON( "" << 3 ), DiskLoc(1,3) ) );
- uow.commit();
- }
- }
-
- // restore position
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- // make sure that we can still see the unindexed data, since we're working on
- // a snapshot
- cursor->advance();
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
-
- cursor->advance();
- ASSERT( cursor->isEOF() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, Locate1Reverse ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- BSONObj key = BSON( "" << 1 );
- DiskLoc loc( 5, 16 );
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) );
- ASSERT( !cursor->locate( key, loc ) );
- }
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- Status res = sortedData.insert( &opCtx, key, loc, true );
- ASSERT_OK( res );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) );
- ASSERT( cursor->locate( key, loc ) );
- ASSERT_EQUALS( key, cursor->getKey() );
- ASSERT_EQUALS( loc, cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, LocateInexactReverse ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = makeColumnFamily( db.get() );
-
- RocksSortedDataImpl sortedData( db.get(), cfh.get(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "a" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "a" << 3 ), DiskLoc(1,1), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) );
- ASSERT_FALSE( cursor->locate( BSON( "a" << 2 ), DiskLoc(1,1) ) );
- ASSERT_FALSE( cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionReverseSimple ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) );
- ASSERT( !cursor->locate( BSON( "a" << 1 ), DiskLoc(2,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- // restore position
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
-
- // repeat, with a different value
- ASSERT( !cursor->locate( BSON( "a" << 2 ), DiskLoc(2,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- // restore position
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionEOFReverse ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 4 ), DiskLoc(1,4), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) );
- ASSERT_FALSE( cursor->locate( BSON( "" << 2 ), DiskLoc(1,2) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
-
- // advance to the end
- while ( !cursor->isEOF() ) {
- cursor->advance();
- }
-
- ASSERT( cursor->isEOF() );
-
- // save the position
- cursor->savePosition();
-
- // restore position, make sure we're at the end
- cursor->restorePosition( &opCtx );
- ASSERT( cursor->isEOF() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionInsertReverse ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx,
- -1 ) );
- ASSERT( !cursor->locate( BSON( "" << 3 ), DiskLoc(2,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT_OK(
- sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- uow.commit();
- }
- }
-
- // restore position, make sure we don't see the newly inserted value
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
-
- cursor->advance();
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
-
- cursor->advance();
- ASSERT( cursor->isEOF() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionDelete1Reverse ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx,
- -1 ) );
- ASSERT( !cursor->locate( BSON( "" << 3 ), DiskLoc(2,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT( sortedData.unindex( &opCtx, BSON( "" << 3 ), DiskLoc(1,3) ) );
- uow.commit();
- }
- }
-
- // restore position, make sure we still see the deleted key and value, because
- // we're using a snapshot
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionDelete2Reverse ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx,
- -1 ) );
- ASSERT( !cursor->locate( BSON( "" << 2 ), DiskLoc(2,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT( sortedData.unindex( &opCtx, BSON( "" << 1 ), DiskLoc(1,1) ) );
- uow.commit();
- }
- }
-
- // restore position
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
- }
- }
- }
-
- TEST( RocksSortedDataTest, SaveAndRestorePositionDelete3Reverse ) {
- unittest::TempDir td( _rocksSortedDataTestDir );
- scoped_ptr<rocksdb::DB> db( getDB( td.path() ) );
-
- {
- RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering );
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
-
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) );
- ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) );
- uow.commit();
- }
- }
-
- {
- MyOperationContext opCtx( db.get() );
- scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx,
- -1 ) );
- ASSERT( !cursor->locate( BSON( "" << 2 ), DiskLoc(2,0) ) );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- // save the position
- cursor->savePosition();
-
- {
- MyOperationContext opCtx( db.get() );
- {
- WriteUnitOfWork uow( &opCtx );
- ASSERT( sortedData.unindex( &opCtx, BSON( "" << 1 ), DiskLoc(1,1) ) );
- uow.commit();
- }
- }
-
- // restore position
- cursor->restorePosition( &opCtx );
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() );
-
- // make sure that we can still see the unindexed data, since we're working on
- // a snapshot
- cursor->advance();
- ASSERT( !cursor->isEOF() );
- ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() );
- ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() );
-
- cursor->advance();
- ASSERT( cursor->isEOF() );
- }
- }
- }
+ HarnessHelper* newHarnessHelper() { return new RocksSortedDataImplHarness(); }
}