diff options
author | Eliot Horowitz <eliot@10gen.com> | 2014-10-11 00:14:30 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2014-10-11 00:14:30 -0400 |
commit | 59b2e9e90bd6ae25e69bb980df418b9ed614e943 (patch) | |
tree | 787ebda07fdf33e2d97749289e8739e377b899be /src/mongo/db/storage/rocks | |
parent | 31a2bc4a955e5ba7751c56652ed3c85dc6991b24 (diff) | |
download | mongo-59b2e9e90bd6ae25e69bb980df418b9ed614e943.tar.gz |
SERVER-14352: move rocks engine to kv interface
Diffstat (limited to 'src/mongo/db/storage/rocks')
20 files changed, 753 insertions, 3631 deletions
diff --git a/src/mongo/db/storage/rocks/SConscript b/src/mongo/db/storage/rocks/SConscript index 93a4887908b..e79adc48bba 100644 --- a/src/mongo/db/storage/rocks/SConscript +++ b/src/mongo/db/storage/rocks/SConscript @@ -6,8 +6,6 @@ if has_option("rocksdb"): env.Library( target= 'storage_rocks_base', source= [ - 'rocks_collection_catalog_entry.cpp', - 'rocks_database_catalog_entry.cpp', 'rocks_engine.cpp', 'rocks_record_store.cpp', 'rocks_recovery_unit.cpp', @@ -30,58 +28,43 @@ if has_option("rocksdb"): env.Library( target= 'storage_rocks', source= [ - 'rocks_database_catalog_entry_mongod.cpp', 'rocks_init.cpp' ], LIBDEPS= [ - 'storage_rocks_base' + 'storage_rocks_base', + '$BUILD_DIR/mongo/db/storage/kv/kv_engine' ] ) - env.Library( - target= 'storage_rocks_fake', - source= [ - 'rocks_database_catalog_entry_fake.cpp', - ], - LIBDEPS= [ - 'storage_rocks_base' - ] - ) env.CppUnitTest( - target='storage_rocks_engine_test', - source=['rocks_engine_test.cpp', - ], - LIBDEPS=[ - 'storage_rocks_fake' + target='storage_rocks_sorted_data_impl_test', + source=['rocks_sorted_data_impl_test.cpp' + ], + LIBDEPS=[ + 'storage_rocks_base', + '$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness' ] - ) + ) - env.CppUnitTest( - target='storage_rocks_record_store_test', - source=['rocks_record_store_test.cpp', - ], - LIBDEPS=[ - 'storage_rocks_fake' - ] - ) env.CppUnitTest( - target='storage_rocks_sorted_data_impl_test', - source=['rocks_sorted_data_impl_test.cpp', - ], - LIBDEPS=[ - 'storage_rocks_fake' + target='storage_rocks_record_store_test', + source=['rocks_record_store_test.cpp' + ], + LIBDEPS=[ + 'storage_rocks_base', + '$BUILD_DIR/mongo/db/storage/record_store_test_harness' ] - ) - + ) env.CppUnitTest( - target='storage_rocks_sorted_data_impl_harness_test', - source=['rocks_sorted_data_impl_harness_test.cpp' + target='storage_rocks_engine_test', + source=['rocks_engine_test.cpp' ], LIBDEPS=[ - 'storage_rocks_fake', - '$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness' + 'storage_rocks_base', + '$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness' ] ) + diff --git a/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.cpp b/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.cpp deleted file mode 100644 index 102e9e14b39..00000000000 --- a/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.cpp +++ /dev/null @@ -1,228 +0,0 @@ -// rocks_collection_catalog_entry.cpp - -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h" - -#include <rocksdb/db.h> - -#include "mongo/db/index/index_descriptor.h" -#include "mongo/db/storage/rocks/rocks_engine.h" - -namespace mongo { - - static const int _maxAllowedIndexes = 64; // for compatability for now, could be higher - - /** - * bson schema - * { ns: <name for sanity>, - * options : <bson spec>, - * indexes : [ { spec : <bson spec>, - * ready: <bool>, - * head: DiskLoc, - * multikey: <bool> } ] - * } - */ - - RocksCollectionCatalogEntry::RocksCollectionCatalogEntry( RocksEngine* engine, - const StringData& ns ) - : BSONCollectionCatalogEntry( ns ), - _engine( engine ), - _metaDataKey( string( "metadata-" ) + ns.toString() ) { } - - int RocksCollectionCatalogEntry::getMaxAllowedIndexes() const { - return _maxAllowedIndexes; - } - - BSONObj RocksCollectionCatalogEntry::getOtherIndexSpec( const StringData& indexName, - rocksdb::DB* db ) const { - MetaData md = _getMetaData( db ); - - int offset = md.findIndexOffset( indexName ); - invariant( offset >= 0 ); - return md.indexes[offset].spec.getOwned(); - } - - bool RocksCollectionCatalogEntry::setIndexIsMultikey(OperationContext* txn, - const StringData& indexName, - bool multikey ) { - boost::mutex::scoped_lock lk( _metaDataMutex ); - MetaData md = _getMetaData_inlock(); - - int offset = md.findIndexOffset( indexName ); - invariant( offset >= 0 ); - if ( md.indexes[offset].multikey == multikey ) - return false; - md.indexes[offset].multikey = multikey; - _putMetaData_inlock( md ); - return true; - } - - void RocksCollectionCatalogEntry::setIndexHead( OperationContext* txn, - const StringData& indexName, - const DiskLoc& newHead ) { - boost::mutex::scoped_lock lk( _metaDataMutex ); - MetaData md = _getMetaData_inlock(); - - int offset = md.findIndexOffset( indexName ); - invariant( offset >= 0 ); - md.indexes[offset].head = newHead; - _putMetaData_inlock( md ); - } - - void RocksCollectionCatalogEntry::indexBuildSuccess( OperationContext* txn, - const StringData& indexName ) { - boost::mutex::scoped_lock lk( _metaDataMutex ); - MetaData md = _getMetaData_inlock(); - - int offset = md.findIndexOffset( indexName ); - invariant( offset >= 0 ); - md.indexes[offset].ready = true; - _putMetaData_inlock( md ); - } - - Status RocksCollectionCatalogEntry::removeIndex( OperationContext* txn, - const StringData& indexName ) { - boost::mutex::scoped_lock lk( _metaDataMutex ); - - _removeIndexFromMetaData_inlock( indexName ); - - // drop the actual index in rocksdb - rocksdb::ColumnFamilyHandle* cfh = _engine->getIndexColumnFamilyNoCreate( ns().ns(), - indexName ); - - // Note: this invalidates cfh. Do not use after this call - _engine->removeColumnFamily( &cfh, indexName, ns().ns() ); - invariant( cfh == nullptr ); - - return Status::OK(); - } - - void RocksCollectionCatalogEntry::removeIndexWithoutDroppingCF( OperationContext* txn, - const StringData& indexName ) { - boost::mutex::scoped_lock lk( _metaDataMutex ); - _removeIndexFromMetaData_inlock( indexName ); - } - - Status RocksCollectionCatalogEntry::prepareForIndexBuild( OperationContext* txn, - const IndexDescriptor* spec ) { - boost::mutex::scoped_lock lk( _metaDataMutex ); - MetaData md = _getMetaData_inlock(); - - md.indexes.push_back( IndexMetaData( spec->infoObj(), false, DiskLoc(), false ) ); - _putMetaData_inlock( md ); - return Status::OK(); - } - - /* Updates the expireAfterSeconds field of the given index to the value in newExpireSecs. - * The specified index must already contain an expireAfterSeconds field, and the value in - * that field and newExpireSecs must both be numeric. - */ - void RocksCollectionCatalogEntry::updateTTLSetting( OperationContext* txn, - const StringData& indexName, - long long newExpireSeconds ) { - invariant( !"ttl settings change not supported in rocks yet" ); - } - - void RocksCollectionCatalogEntry::createMetaData() { - string result; - // XXX not using a snapshot here - rocksdb::Status status = _engine->getDB()->Get( rocksdb::ReadOptions(), - _metaDataKey, - &result ); - invariant( status.IsNotFound() ); - - MetaData md; - md.ns = ns(); - - BSONObj obj = md.toBSON(); - status = _engine->getDB()->Put( rocksdb::WriteOptions(), - _metaDataKey, - rocksdb::Slice( obj.objdata(), - obj.objsize() ) ); - invariant( status.ok() ); - } - - - void RocksCollectionCatalogEntry::dropMetaData() { - rocksdb::Status status = _engine->getDB()->Delete( rocksdb::WriteOptions(), _metaDataKey ); - invariant( status.ok() ); - } - - RocksCollectionCatalogEntry::MetaData RocksCollectionCatalogEntry::_getMetaData( OperationContext* txn ) const { - return _getMetaData( _engine->getDB() ); - } - - RocksCollectionCatalogEntry::MetaData RocksCollectionCatalogEntry::_getMetaData( - rocksdb::DB* db ) const { - invariant( db ); - boost::mutex::scoped_lock lk( _metaDataMutex ); - return _getMetaData_inlock( db ); - } - - RocksCollectionCatalogEntry::MetaData RocksCollectionCatalogEntry::_getMetaData_inlock() const { - return _getMetaData_inlock( _engine->getDB() ); - } - - // The metadata in a column family with a specific name. This method reads from that column - // family - RocksCollectionCatalogEntry::MetaData RocksCollectionCatalogEntry::_getMetaData_inlock( - rocksdb::DB* db ) const { - string result; - // XXX not using a snapshot here - rocksdb::Status status = db->Get( rocksdb::ReadOptions(), _metaDataKey, &result ); - invariant( !status.IsNotFound() ); - invariant( status.ok() ); - - MetaData md; - md.parse( BSONObj( result.c_str() ) ); - return md; - } - - void RocksCollectionCatalogEntry::_removeIndexFromMetaData_inlock( - const StringData& indexName ) { - MetaData md = _getMetaData_inlock(); - - // remove info from meta data - invariant( md.eraseIndex( indexName ) ); - _putMetaData_inlock( md ); - } - - void RocksCollectionCatalogEntry::_putMetaData_inlock( const MetaData& in ) { - // XXX: this should probably be done via the RocksRecoveryUnit. - // TODO move into recovery unit - BSONObj obj = in.toBSON(); - rocksdb::Status status = _engine->getDB()->Put( rocksdb::WriteOptions(), - _metaDataKey, - rocksdb::Slice( obj.objdata(), - obj.objsize() ) ); - invariant( status.ok() ); - } - -} diff --git a/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.h b/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.h deleted file mode 100644 index 46aababd201..00000000000 --- a/src/mongo/db/storage/rocks/rocks_collection_catalog_entry.h +++ /dev/null @@ -1,119 +0,0 @@ -// rocks_collection_catalog_entry.h - -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include "mongo/db/catalog/collection_catalog_entry.h" -#include "mongo/db/storage/bson_collection_catalog_entry.h" - -namespace rocksdb { - class DB; -} - -namespace mongo { - - class RocksEngine; - - class RocksCollectionCatalogEntry : public BSONCollectionCatalogEntry { - public: - RocksCollectionCatalogEntry( RocksEngine* engine, const StringData& ns ); - - virtual ~RocksCollectionCatalogEntry(){} - - // ------- indexes ---------- - - virtual int getMaxAllowedIndexes() const; - - virtual bool setIndexIsMultikey(OperationContext* txn, - const StringData& indexName, - bool multikey = true); - - virtual void setIndexHead( OperationContext* txn, - const StringData& indexName, - const DiskLoc& newHead ); - - virtual Status removeIndex( OperationContext* txn, - const StringData& indexName ); - - void removeIndexWithoutDroppingCF( OperationContext* txn, - const StringData& indexName ); - - virtual Status prepareForIndexBuild( OperationContext* txn, - const IndexDescriptor* spec ); - - virtual void indexBuildSuccess( OperationContext* txn, - const StringData& indexName ); - - /* Updates the expireAfterSeconds field of the given index to the value in newExpireSecs. - * The specified index must already contain an expireAfterSeconds field, and the value in - * that field and newExpireSecs must both be numeric. - */ - virtual void updateTTLSetting( OperationContext* txn, - const StringData& idxName, - long long newExpireSeconds ); - - // ------ internal api - - BSONObj getOtherIndexSpec( const StringData& idxName, rocksdb::DB* db ) const; - - // called once when collection is created. - void createMetaData(); - - // when collection is dropped, call this - // all indexes have to be dropped first. - void dropMetaData(); - - const string metaDataKey() { return _metaDataKey; } - - protected: - virtual MetaData _getMetaData( OperationContext* txn ) const; - - private: - MetaData _getMetaData( rocksdb::DB* db ) const; - - MetaData _getMetaData_inlock() const; - MetaData _getMetaData_inlock( rocksdb::DB* db ) const; - - void _removeIndexFromMetaData_inlock( const StringData& indexName ); - - void _putMetaData_inlock( const MetaData& in ); - - RocksEngine* _engine; // not owned - - // the name of the column family which holds the metadata. - const string _metaDataKey; - - // lock which must be acquired before calling _getMetaData_inlock(). Protects the metadata - // stored in the metadata column family. - // Locking order RocksEngine::_entryMapMutex before _metaDataMutex - mutable boost::mutex _metaDataMutex; - }; - -} diff --git a/src/mongo/db/storage/rocks/rocks_database_catalog_entry.cpp b/src/mongo/db/storage/rocks/rocks_database_catalog_entry.cpp deleted file mode 100644 index 7c074a1529f..00000000000 --- a/src/mongo/db/storage/rocks/rocks_database_catalog_entry.cpp +++ /dev/null @@ -1,111 +0,0 @@ -// rocks_database_catalog_entry.cpp - -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/storage/rocks/rocks_database_catalog_entry.h" - -#include <rocksdb/options.h> - -#include "mongo/db/jsobj.h" -#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h" -#include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/rocks/rocks_record_store.h" -#include "mongo/util/log.h" - -namespace mongo { - - RocksDatabaseCatalogEntry::RocksDatabaseCatalogEntry( RocksEngine* engine, - const StringData& dbname ) - : DatabaseCatalogEntry( dbname ), _engine( engine ) { - } - - bool RocksDatabaseCatalogEntry::exists() const { - return !isEmpty(); - } - - bool RocksDatabaseCatalogEntry::isEmpty() const { - // TODO(XXX): make this fast - std::list<std::string> lst; - getCollectionNamespaces( &lst ); - return lst.empty(); - } - - void RocksDatabaseCatalogEntry::appendExtraStats( OperationContext* opCtx, - BSONObjBuilder* out, - double scale ) const { - // put some useful stats here - out->append( "note", "RocksDatabaseCatalogEntry should put some database level stats in" ); - } - - bool RocksDatabaseCatalogEntry::currentFilesCompatible( OperationContext* opCtx ) const { - error() << "RocksDatabaseCatalogEntry::currentFilesCompatible not done"; - return true; - } - - void RocksDatabaseCatalogEntry::getCollectionNamespaces( std::list<std::string>* out ) const { - _engine->getCollectionNamespaces( name(), out ); - } - - CollectionCatalogEntry* RocksDatabaseCatalogEntry::getCollectionCatalogEntry( - OperationContext* txn, - const StringData& ns ) const { - RocksEngine::Entry* entry = _engine->getEntry( ns ); - if ( !entry ) - return NULL; - return entry->collectionEntry.get(); - } - - RecordStore* RocksDatabaseCatalogEntry::getRecordStore( OperationContext* txn, - const StringData& ns ) { - RocksEngine::Entry* entry = _engine->getEntry( ns ); - if ( !entry ) - return NULL; - return entry->recordStore.get(); - } - - Status RocksDatabaseCatalogEntry::createCollection( OperationContext* txn, - const StringData& ns, - const CollectionOptions& options, - bool allocateDefaultSpace ) { - return _engine->createCollection( txn, ns, options ); - } - - Status RocksDatabaseCatalogEntry::renameCollection( OperationContext* txn, - const StringData& fromNS, - const StringData& toNS, - bool stayTemp ) { - return Status( ErrorCodes::InternalError, - "RocksDatabaseCatalogEntry doesn't support rename yet" ); - } - - Status RocksDatabaseCatalogEntry::dropCollection( OperationContext* opCtx, - const StringData& ns ) { - return _engine->dropCollection( opCtx, ns ); - } -} diff --git a/src/mongo/db/storage/rocks/rocks_database_catalog_entry.h b/src/mongo/db/storage/rocks/rocks_database_catalog_entry.h deleted file mode 100644 index 95b4375b1e8..00000000000 --- a/src/mongo/db/storage/rocks/rocks_database_catalog_entry.h +++ /dev/null @@ -1,97 +0,0 @@ -// rocks_database_catalog_entry.h - -/** -* Copyright (C) 2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#pragma once - -#include "mongo/db/catalog/database_catalog_entry.h" - -namespace mongo { - - class RocksEngine; - - /** - * this class acts as a thin layer over RocksEngine, - * and does and stores nothing. - */ - class RocksDatabaseCatalogEntry : public DatabaseCatalogEntry { - public: - RocksDatabaseCatalogEntry( RocksEngine* engine, const StringData& dbname ); - - virtual bool exists() const; - virtual bool isEmpty() const; - - virtual void appendExtraStats( OperationContext* opCtx, - BSONObjBuilder* out, - double scale ) const; - - // these are hacks :( - virtual bool isOlderThan24( OperationContext* opCtx ) const { return false; } - virtual void markIndexSafe24AndUp( OperationContext* opCtx ) {} - - /** - * @return true if current files on disk are compatibile with the current version. - * if we return false, then an upgrade will be required - */ - virtual bool currentFilesCompatible( OperationContext* opCtx ) const; - - // ---- - - virtual void getCollectionNamespaces( std::list<std::string>* out ) const; - - // The DatabaseCatalogEntry owns this, do not delete - virtual CollectionCatalogEntry* getCollectionCatalogEntry( OperationContext* txn, - const StringData& ns ) const; - - // The DatabaseCatalogEntry owns this, do not delete - virtual RecordStore* getRecordStore( OperationContext* txn, - const StringData& ns ); - - // Ownership passes to caller - virtual IndexAccessMethod* getIndex( OperationContext* txn, - const CollectionCatalogEntry* collection, - IndexCatalogEntry* index ); - - virtual Status createCollection( OperationContext* txn, - const StringData& ns, - const CollectionOptions& options, - bool allocateDefaultSpace ); - - virtual Status renameCollection( OperationContext* txn, - const StringData& fromNS, - const StringData& toNS, - bool stayTemp ); - - virtual Status dropCollection( OperationContext* opCtx, - const StringData& ns ); - - private: - RocksEngine* _engine; - }; -} diff --git a/src/mongo/db/storage/rocks/rocks_database_catalog_entry_fake.cpp b/src/mongo/db/storage/rocks/rocks_database_catalog_entry_fake.cpp deleted file mode 100644 index e33e0f928a8..00000000000 --- a/src/mongo/db/storage/rocks/rocks_database_catalog_entry_fake.cpp +++ /dev/null @@ -1,42 +0,0 @@ -// rocks_database_catalog_entry_fake.cpp - -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/storage/rocks/rocks_database_catalog_entry.h" - -#include "mongo/util/assert_util.h" - -namespace mongo { - - IndexAccessMethod* RocksDatabaseCatalogEntry::getIndex( OperationContext* txn, - const CollectionCatalogEntry* collection, - IndexCatalogEntry* index ) { - invariant( !"so sad, no indexes with rocks" ); - } -} diff --git a/src/mongo/db/storage/rocks/rocks_database_catalog_entry_mongod.cpp b/src/mongo/db/storage/rocks/rocks_database_catalog_entry_mongod.cpp deleted file mode 100644 index 92b720ea0ef..00000000000 --- a/src/mongo/db/storage/rocks/rocks_database_catalog_entry_mongod.cpp +++ /dev/null @@ -1,89 +0,0 @@ -// rocks_database_catalog_entry_mongod.cpp - -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/db/storage/rocks/rocks_database_catalog_entry.h" - -#include <boost/optional.hpp> -#include <rocksdb/db.h> - -#include "mongo/db/index/2d_access_method.h" -#include "mongo/db/index/btree_access_method.h" -#include "mongo/db/index/fts_access_method.h" -#include "mongo/db/index/hash_access_method.h" -#include "mongo/db/index/haystack_access_method.h" -#include "mongo/db/index/index_access_method.h" -#include "mongo/db/index/index_descriptor.h" -#include "mongo/db/index/s2_access_method.h" -#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h" -#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h" -#include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/util/log.h" - -namespace mongo { - - IndexAccessMethod* RocksDatabaseCatalogEntry::getIndex( OperationContext* txn, - const CollectionCatalogEntry* collection, - IndexCatalogEntry* index ) { - const IndexDescriptor* desc = index->descriptor(); - const Ordering order( Ordering::make( desc->keyPattern() ) ); - - rocksdb::ColumnFamilyHandle* cf = _engine->getIndexColumnFamily( collection->ns().ns(), - desc->indexName(), - order ); - - std::auto_ptr<RocksSortedDataImpl> raw( new RocksSortedDataImpl( _engine->getDB(), - cf, - order ) ); - - const string& type = index->descriptor()->getAccessMethodName(); - - if ("" == type) - return new BtreeAccessMethod( index, raw.release() ); - - if (IndexNames::HASHED == type) - return new HashAccessMethod( index, raw.release() ); - - if (IndexNames::GEO_2DSPHERE == type) - return new S2AccessMethod( index, raw.release() ); - - if (IndexNames::TEXT == type) - return new FTSAccessMethod( index, raw.release() ); - - if (IndexNames::GEO_HAYSTACK == type) - return new HaystackAccessMethod( index, raw.release() ); - - if (IndexNames::GEO_2D == type) - return new TwoDAccessMethod( index, raw.release() ); - - log() << "Can't find index for keyPattern " << index->descriptor()->keyPattern(); - // TODO make this fassert - return NULL; - } -} diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp index f89b75cdcc6..2a09879602a 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.cpp +++ b/src/mongo/db/storage/rocks/rocks_engine.cpp @@ -44,153 +44,143 @@ #include <rocksdb/utilities/write_batch_with_index.h> #include "mongo/db/catalog/collection_options.h" +#include "mongo/db/index/index_descriptor.h" #include "mongo/db/operation_context.h" -#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h" -#include "mongo/db/storage/rocks/rocks_database_catalog_entry.h" #include "mongo/db/storage/rocks/rocks_record_store.h" #include "mongo/db/storage/rocks/rocks_recovery_unit.h" #include "mongo/db/storage/rocks/rocks_sorted_data_impl.h" #include "mongo/util/log.h" -#define ROCKS_TRACE if(0) log() +#define ROCKS_TRACE log() #define ROCKS_STATUS_OK( s ) if ( !( s ).ok() ) { error() << "rocks error: " << ( s ).ToString(); \ invariant( false ); } namespace mongo { - RocksEngine::RocksEngine( const std::string& path ) : _path( path ), _defaultHandle( NULL ) { - // TODO make this more fine-grained? - boost::mutex::scoped_lock lk( _entryMapMutex ); - std::vector<rocksdb::ColumnFamilyDescriptor> families; - - vector<string> familyNames = _listFamilyNames( path ); - - // Create the shared collection comparator - _collectionComparator.reset( RocksRecordStore::newRocksCollectionComparator() ); - - if ( !familyNames.empty() ) { - // go through and create RocksCollectionCatalogEntries for all non-indexes - _createNonIndexCatalogEntries( familyNames ); - - // Create a mapping from index names to the Ordering object for each index. - // These Ordering objects will be used to create RocksIndexEntryComparators to be used - // with each column family representing a namespace - map<string, Ordering> indexOrderings = _createIndexOrderings( familyNames, path ); - - // get ColumnFamilyDescriptors for all the column families - families = _createCfds( familyNames, indexOrderings ); + // TODO make create/drop operations support rollback? + + RocksEngine::RocksEngine(const std::string& path) + : _path(path), _collectionComparator(RocksRecordStore::newRocksCollectionComparator()) { + + auto columnFamilyNames = _loadColumnFamilies(); // vector of column family names + std::unordered_map<std::string, Ordering> orderings; // column family name -> Ordering + std::set<std::string> collections; // set of collection names + + if (columnFamilyNames.empty()) { // new DB + columnFamilyNames.push_back(rocksdb::kDefaultColumnFamilyName); + } else { // existing DB + // open DB in read-only mode to load metadata + rocksdb::DB* dbReadOnly; + auto s = rocksdb::DB::OpenForReadOnly(dbOptions(), path, &dbReadOnly); + ROCKS_STATUS_OK(s); + auto itr = dbReadOnly->NewIterator(rocksdb::ReadOptions()); + orderings = _loadOrderingMetaData(itr); + collections = _loadCollections(itr); + delete itr; + delete dbReadOnly; } - rocksdb::DB* dbPtr; + std::vector<rocksdb::ColumnFamilyDescriptor> columnFamilies; + std::set<std::string> toDropColumnFamily; - // If there are no column families, then just open the database and return - if ( families.empty() ) { - rocksdb::Status s = rocksdb::DB::Open( dbOptions(), path, &dbPtr ); - _db.reset( dbPtr ); - ROCKS_STATUS_OK( s ); - - _defaultHandle = _db->DefaultColumnFamily(); - return; + for (const auto& cf : columnFamilyNames) { + if (cf == rocksdb::kDefaultColumnFamilyName) { + columnFamilies.emplace_back(); + continue; + } + auto orderings_iter = orderings.find(cf); + auto collections_iter = collections.find(cf); + bool isIndex = orderings_iter != orderings.end(); + bool isCollection = collections_iter != collections.end(); + invariant(!isIndex || !isCollection); + if (isIndex) { + columnFamilies.emplace_back(cf, _indexOptions(orderings_iter->second)); + } else if (isCollection) { + columnFamilies.emplace_back(cf, _collectionOptions()); + } else { + // this can happen because write and createColumnFamily are not atomic + toDropColumnFamily.insert(cf); + columnFamilies.emplace_back(cf, _collectionOptions()); + } } - // Open the database, getting handles for every column family std::vector<rocksdb::ColumnFamilyHandle*> handles; - - rocksdb::Status s = rocksdb::DB::Open( dbOptions(), path, families, &handles, &dbPtr ); - ROCKS_STATUS_OK( s ); - _db.reset( dbPtr ); - - invariant( handles.size() == families.size() ); - - _defaultHandle = _db->DefaultColumnFamily(); - - // Create an Entry object for every ColumnFamilyHandle - _createEntries( families, handles ); + rocksdb::DB* db; + auto s = rocksdb::DB::Open(dbOptions(), path, columnFamilies, &handles, &db); + ROCKS_STATUS_OK(s); + invariant(handles.size() == columnFamilies.size()); + for (size_t i = 0; i < handles.size(); ++i) { + if (toDropColumnFamily.find(columnFamilies[i].name) != toDropColumnFamily.end()) { + db->DropColumnFamily(handles[i]); + delete handles[i]; + } else if (columnFamilyNames[i] == rocksdb::kDefaultColumnFamilyName) { + // we will not be needing this + delete handles[i]; + } else { + _identColumnFamilyMap[columnFamilies[i].name].reset(handles[i]); + } + } + _db.reset(db); } - RocksEngine::~RocksEngine() { - } + RocksEngine::~RocksEngine() {} - RecoveryUnit* RocksEngine::newRecoveryUnit( OperationContext* opCtx ) { - /* TODO change to false when unit of work hooked up*/ + RecoveryUnit* RocksEngine::newRecoveryUnit() { + // TODO change this to false once higher level code explicitly commits every transaction return new RocksRecoveryUnit(_db.get(), true); } - void RocksEngine::listDatabases( std::vector<std::string>* out ) const { - std::set<std::string> dbs; - - // TODO: make this faster - boost::mutex::scoped_lock lk( _entryMapMutex ); - for ( EntryMap::const_iterator i = _entryMap.begin(); i != _entryMap.end(); ++i ) { - const StringData& ns = i->first; - if ( dbs.insert( nsToDatabase( ns ) ).second ) - out->push_back( nsToDatabase( ns ) ); + Status RocksEngine::createRecordStore(OperationContext* opCtx, const StringData& ident, + const CollectionOptions& options) { + if (_existsColumnFamily(ident)) { + return Status::OK(); } - } - - DatabaseCatalogEntry* RocksEngine::getDatabaseCatalogEntry( OperationContext* opCtx, - const StringData& db ) { - boost::mutex::scoped_lock lk( _dbCatalogMapMutex ); - - boost::shared_ptr<RocksDatabaseCatalogEntry>& dbce = _dbCatalogMap[db.toString()]; - if ( !dbce ) { - dbce = boost::make_shared<RocksDatabaseCatalogEntry>( this, db ); + _db->Put(rocksdb::WriteOptions(), "collection-" + ident.toString(), rocksdb::Slice()); + return _createColumnFamily(_collectionOptions(), ident); + } + + RecordStore* RocksEngine::getRecordStore(OperationContext* opCtx, const StringData& ns, + const StringData& ident, + const CollectionOptions& options) { + auto columnFamily = _getColumnFamily(ident); + if (options.capped) { + return new RocksRecordStore( + ns, ident, _db.get(), columnFamily, true, + options.cappedSize ? options.cappedSize : 4096, // default size + options.cappedMaxDocs ? options.cappedMaxDocs : -1); + } else { + return new RocksRecordStore(ns, ident, _db.get(), columnFamily); } + } - return dbce.get(); + Status RocksEngine::dropRecordStore(OperationContext* opCtx, const StringData& ident) { + _db->Delete(rocksdb::WriteOptions(), "collection-" + ident.toString()); + return _dropColumnFamily(ident); } - int RocksEngine::flushAllFiles( bool sync ) { - boost::mutex::scoped_lock lk( _entryMapMutex ); - for ( EntryMap::const_iterator i = _entryMap.begin(); i != _entryMap.end(); ++i ) { - if ( i->second->cfHandle ) - _db->Flush( rocksdb::FlushOptions(), i->second->cfHandle.get() ); + Status RocksEngine::createSortedDataInterface(OperationContext* opCtx, const StringData& ident, + const IndexDescriptor* desc) { + if (_existsColumnFamily(ident)) { + return Status::OK(); } - return _entryMap.size(); - } + auto keyPattern = desc->keyPattern(); - Status RocksEngine::repairDatabase( OperationContext* tnx, - const std::string& dbName, - bool preserveClonedFilesOnFailure, - bool backupOriginalFiles ) { - // TODO implement - return Status::OK(); + _db->Put(rocksdb::WriteOptions(), "indexordering-" + ident.toString(), + rocksdb::Slice(keyPattern.objdata(), keyPattern.objsize())); + return _createColumnFamily(_indexOptions(Ordering::make(keyPattern)), ident); } - void RocksEngine::cleanShutdown(OperationContext* txn) { - // no locking here because this is only called while single-threaded. - dynamic_cast<RocksRecoveryUnit*>(txn->recoveryUnit())->destroy(); - _entryMap = EntryMap(); - _dbCatalogMap = DbCatalogMap(); - _collectionComparator.reset(); - _db.reset(); + SortedDataInterface* RocksEngine::getSortedDataInterface(OperationContext* opCtx, + const StringData& ident, + const IndexDescriptor* desc) { + return new RocksSortedDataImpl(_db.get(), _getColumnFamily(ident), + Ordering::make(desc->keyPattern())); } - Status RocksEngine::closeDatabase( OperationContext* txn, const StringData& db ) { - boost::mutex::scoped_lock lk( _dbCatalogMapMutex ); - _dbCatalogMap.erase( db.toString() ); - return Status::OK(); - } - - Status RocksEngine::dropDatabase( OperationContext* txn, const StringData& db ) { - const string prefix = db.toString() + "."; - boost::mutex::scoped_lock lk( _entryMapMutex ); - vector<string> toDrop; - - // TODO don't iterate through everything - for (EntryMap::const_iterator it = _entryMap.begin(); it != _entryMap.end(); ++it ) { - const StringData& ns = it->first; - if ( ns.startsWith( prefix ) ) { - toDrop.push_back( ns.toString() ); - } - } - - for (vector<string>::const_iterator it = toDrop.begin(); it != toDrop.end(); ++it ) { - _dropCollection_inlock( txn, *it ); - } - - return closeDatabase( txn, db ); + Status RocksEngine::dropSortedDataInterface(OperationContext* opCtx, const StringData& ident) { + _db->Delete(rocksdb::WriteOptions(), "indexordering-" + ident.toString()); + return _dropColumnFamily(ident); } // non public api @@ -201,209 +191,94 @@ namespace mongo { return options; } - const RocksEngine::Entry* RocksEngine::getEntry( const StringData& ns ) const { - boost::mutex::scoped_lock lk( _entryMapMutex ); - EntryMap::const_iterator i = _entryMap.find( ns ); - if ( i == _entryMap.end() ) - return NULL; - return i->second.get(); - } - - RocksEngine::Entry* RocksEngine::getEntry( const StringData& ns ) { - boost::mutex::scoped_lock lk( _entryMapMutex ); - EntryMap::const_iterator i = _entryMap.find( ns ); - if ( i == _entryMap.end() ) - return NULL; - return i->second.get(); + bool RocksEngine::_existsColumnFamily(const StringData& ident) { + boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex); + return _identColumnFamilyMap.find(ident) != _identColumnFamilyMap.end(); } - rocksdb::ColumnFamilyHandle* RocksEngine::getIndexColumnFamilyNoCreate( - const StringData& ns, - const StringData& indexName ) { - ROCKS_TRACE << "getIndexColumnFamilyWithoutCreate " << ns << "$" << indexName; - - boost::mutex::scoped_lock lk( _entryMapMutex ); - return _getIndexColumnFamilyNoCreate_inlock( ns, indexName ); - } - - rocksdb::ColumnFamilyHandle* RocksEngine::getIndexColumnFamily( const StringData& ns, - const StringData& indexName, - const Ordering& order ) { - ROCKS_TRACE << "getIndexColumnFamily " << ns << "$" << indexName; - - boost::mutex::scoped_lock lk( _entryMapMutex ); - rocksdb::ColumnFamilyHandle* existing_cf = _getIndexColumnFamilyNoCreate_inlock( - ns, - indexName ); - if (existing_cf) - return existing_cf; - // if we get here, then the column family doesn't exist, so we need to create it - return _createIndexColumnFamily_inlock(ns, indexName, order); + Status RocksEngine::_createColumnFamily(const rocksdb::ColumnFamilyOptions& options, + const StringData& ident) { + rocksdb::ColumnFamilyHandle* cf; + auto s = _db->CreateColumnFamily(options, ident.toString(), &cf); + if (!s.ok()) { + return toMongoStatus(s); + } + boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex); + _identColumnFamilyMap[ident].reset(cf); + return Status::OK(); } - rocksdb::ColumnFamilyHandle* RocksEngine::_getIndexColumnFamilyNoCreate_inlock( - const StringData& ns, - const StringData& indexName ) { - EntryMap::const_iterator i = _entryMap.find( ns ); - if ( i == _entryMap.end() ) - return NULL; - shared_ptr<Entry> entry = i->second; - + Status RocksEngine::_dropColumnFamily(const StringData& ident) { + boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily; { - rocksdb::ColumnFamilyHandle* handle = entry->indexNameToCF[indexName].get(); - if ( handle ) - return handle; + boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex); + auto cf_iter = _identColumnFamilyMap.find(ident); + if (cf_iter == _identColumnFamilyMap.end()) { + return Status(ErrorCodes::InternalError, "Not found"); + } + columnFamily = cf_iter->second; + _identColumnFamilyMap.erase(cf_iter); } - return NULL; + auto s = _db->DropColumnFamily(columnFamily.get()); + return toMongoStatus(s); } - rocksdb::ColumnFamilyHandle* RocksEngine::_createIndexColumnFamily_inlock( - const StringData& ns, - const StringData& indexName, - const Ordering& order ) { - EntryMap::const_iterator i = _entryMap.find( ns ); - if ( i == _entryMap.end() ) - return NULL; - shared_ptr<Entry> entry = i->second; - - const string fullName = ns.toString() + string("$") + indexName.toString(); - rocksdb::ColumnFamilyHandle* cf = NULL; - - typedef boost::shared_ptr<const rocksdb::Comparator> SharedComparatorPtr; - SharedComparatorPtr& comparator = entry->indexNameToComparator[indexName]; - - comparator.reset( RocksSortedDataImpl::newRocksComparator( order ) ); - invariant( comparator ); - - rocksdb::ColumnFamilyOptions options; - options.comparator = comparator.get(); - - rocksdb::Status status = _db->CreateColumnFamily( options, fullName, &cf ); - ROCKS_STATUS_OK( status ); - invariant( cf != NULL); - entry->indexNameToCF[indexName].reset( cf ); - return cf; - } - - void RocksEngine::removeColumnFamily( rocksdb::ColumnFamilyHandle** cfh, - const StringData& indexName, - const StringData& ns ) { - boost::mutex::scoped_lock lk( _entryMapMutex ); - _removeColumnFamily_inlock( cfh, indexName, ns ); - } - - void RocksEngine::_removeColumnFamily_inlock( rocksdb::ColumnFamilyHandle** cfh, - const StringData& indexName, - const StringData& ns ) { - EntryMap::const_iterator i = _entryMap.find( ns ); - const rocksdb::Status s = _db->DropColumnFamily( *cfh ); - invariant( s.ok() ); - if ( i != _entryMap.end() ) { - i->second->indexNameToCF.erase(indexName); + boost::shared_ptr<rocksdb::ColumnFamilyHandle> RocksEngine::_getColumnFamily( + const StringData& ident) { + { + boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex); + auto cf_iter = _identColumnFamilyMap.find(ident); + invariant(cf_iter != _identColumnFamilyMap.end()); + return cf_iter->second; } - *cfh = NULL; } - void RocksEngine::getCollectionNamespaces( const StringData& dbName, - std::list<std::string>* out ) const { - const string prefix = dbName.toString() + "."; - boost::mutex::scoped_lock lk( _entryMapMutex ); - for (EntryMap::const_iterator i = _entryMap.begin(); i != _entryMap.end(); ++i ) { - const StringData& ns = i->first; - if ( !ns.startsWith( prefix ) ) - continue; - out->push_back( i->first ); + std::unordered_map<std::string, Ordering> RocksEngine::_loadOrderingMetaData( + rocksdb::Iterator* itr) { + const rocksdb::Slice kOrderingPrefix("ordering-"); + std::unordered_map<std::string, Ordering> orderings; + for (itr->Seek(kOrderingPrefix); itr->Valid(); itr->Next()) { + rocksdb::Slice key(itr->key()); + if (!key.starts_with(kOrderingPrefix)) { + break; + } + key.remove_prefix(kOrderingPrefix.size()); + std::string value(itr->value().ToString()); + orderings.insert({key.ToString(), Ordering::make(BSONObj(value.c_str()))}); } + ROCKS_STATUS_OK(itr->status()); + return orderings; } - - Status RocksEngine::createCollection( OperationContext* txn, - const StringData& ns, - const CollectionOptions& options ) { - - ROCKS_TRACE << "RocksEngine::createCollection: " << ns; - - boost::mutex::scoped_lock lk( _entryMapMutex ); - if ( _entryMap.find( ns ) != _entryMap.end() ) - return Status( ErrorCodes::NamespaceExists, "collection already exists" ); - - if (ns.toString().find('$') != string::npos ) { - return Status( ErrorCodes::BadValue, "invalid character in namespace" ); + std::set<std::string> RocksEngine::_loadCollections(rocksdb::Iterator* itr) { + const rocksdb::Slice kCollectionPrefix("collection-"); + std::set<std::string> collections; + for (itr->Seek(kCollectionPrefix); itr->Valid() ; itr->Next()) { + rocksdb::Slice key(itr->key()); + if (!key.starts_with(kCollectionPrefix)) { + break; + } + key.remove_prefix(kCollectionPrefix.size()); + collections.insert(key.ToString()); } - - boost::shared_ptr<Entry> entry( new Entry() ); - - rocksdb::ColumnFamilyHandle* cf; - rocksdb::Status s = _db->CreateColumnFamily( _collectionOptions(), ns.toString(), &cf ); - ROCKS_STATUS_OK( s ); - - const BSONObj optionsObj = options.toBSON(); - const std::string key = ns.toString() + "-options"; - const rocksdb::Slice value( optionsObj.objdata(), optionsObj.objsize() ); - - dynamic_cast<RocksRecoveryUnit*>( txn->recoveryUnit() )->writeBatch()->Put(_defaultHandle, - key, - value); - entry->cfHandle.reset( cf ); - - invariant(_defaultHandle != NULL); - - if ( options.capped ) - entry->recordStore.reset( new RocksRecordStore( - ns, - _db.get(), - entry->cfHandle.get(), - _defaultHandle, - true, - options.cappedSize ? options.cappedSize : 4096, - options.cappedMaxDocs ? options.cappedMaxDocs : -1 ) ); - else - entry->recordStore.reset( new RocksRecordStore( - ns, _db.get(), entry->cfHandle.get(), _defaultHandle ) ); - - entry->collectionEntry.reset( new RocksCollectionCatalogEntry( this, ns ) ); - entry->collectionEntry->createMetaData(); - - _entryMap[ns] = entry; - return Status::OK(); + ROCKS_STATUS_OK(itr->status()); + return collections; } - Status RocksEngine::dropCollection( OperationContext* opCtx, const StringData& ns ) { - boost::mutex::scoped_lock lk( _entryMapMutex ); - return _dropCollection_inlock( opCtx, ns ); - } + std::vector<std::string> RocksEngine::_loadColumnFamilies() { + std::vector<std::string> names; + if (boost::filesystem::exists(_path)) { + rocksdb::Status s = rocksdb::DB::ListColumnFamilies(dbOptions(), _path, &names); - Status RocksEngine::_dropCollection_inlock( OperationContext* opCtx, const StringData& ns ) { - // XXX not using a snapshot here (anywhere in the method, really) - if ( _entryMap.find( ns ) == _entryMap.end() ) - return Status( ErrorCodes::NamespaceNotFound, "can't find collection to drop" ); - Entry* entry = _entryMap[ns].get(); - - for ( auto it = entry->indexNameToCF.begin(); it != entry->indexNameToCF.end(); ++it ) { - entry->collectionEntry->removeIndexWithoutDroppingCF( opCtx, it->first ); - - // drop the actual index in rocksdb - rocksdb::ColumnFamilyHandle* cfh = _getIndexColumnFamilyNoCreate_inlock( ns, - it->first ); - - // Note: this invalidates cfh. Do not use after this call - _removeColumnFamily_inlock( &cfh, it->first, ns ); - invariant( cfh == nullptr ); + if (s.IsIOError()) { + // DNE, this means the directory exists but is empty, which is fine + // because it means no rocks database exists yet + } else { + ROCKS_STATUS_OK(s); + } } - entry->recordStore->dropRsMetaData( opCtx ); - entry->recordStore.reset( NULL ); - entry->collectionEntry->dropMetaData(); - entry->collectionEntry.reset( NULL ); - - rocksdb::Status status = _db->DropColumnFamily( entry->cfHandle.get() ); - ROCKS_STATUS_OK( status ); - - entry->cfHandle.reset( NULL ); - - _entryMap.erase( ns ); - - return Status::OK(); + return names; } rocksdb::Options RocksEngine::dbOptions() { @@ -415,6 +290,7 @@ namespace mongo { // create the DB if it's not already present options.create_if_missing = true; + options.create_missing_column_families = true; return options; } @@ -426,202 +302,11 @@ namespace mongo { return options; } - rocksdb::ColumnFamilyOptions RocksEngine::_indexOptions() const { - return rocksdb::ColumnFamilyOptions(); - } - - vector<std::string> RocksEngine::_listFamilyNames( string filepath ) { - std::vector<std::string> familyNames; - if ( boost::filesystem::exists( filepath ) ) { - rocksdb::Status s = rocksdb::DB::ListColumnFamilies( dbOptions(), - filepath, - &familyNames ); - - if ( s.IsIOError() ) { - // DNE, this means the directory exists but is empty, which is fine - // because it means no rocks database exists yet - } else { - ROCKS_STATUS_OK( s ); - } - } - - return familyNames; - } - - bool RocksEngine::_isDefaultFamily( const string& name ) { - return name == rocksdb::kDefaultColumnFamilyName; - } - - /** - * Create Entry's for all non-index column families in the database. This method is called by - * the constructor. It is necessary because information about indexes is needed before a - * column family representing an index can be opened (specifically, the orderings used in the - * comparators for these column families needs to be known). This information is accessed - * through the RocksCollectionCatalogEntry class for each non-index column family in the - * database. Hence, this method. - */ - void RocksEngine::_createNonIndexCatalogEntries( const vector<string>& namespaces ) { - for ( unsigned i = 0; i < namespaces.size(); ++i ) { - const string ns = namespaces[i]; - - // ignore the default column family, which RocksDB makes us keep around. - if ( _isDefaultFamily( ns ) ) { - continue; - } - - string collection = ns; - if ( ns.find( '&' ) != string::npos || ns.find( '$' ) != string::npos ) { - continue; - } - - boost::shared_ptr<Entry>& entry = _entryMap[collection]; - invariant( !entry ); - entry = boost::make_shared<Entry>(); - - // We'll use this RocksCollectionCatalogEntry to open the column families representing - // indexes - invariant( !entry->collectionEntry ); - entry->collectionEntry.reset( new RocksCollectionCatalogEntry( this, ns ) ); - } - } - - map<string, Ordering> RocksEngine::_createIndexOrderings( const vector<string>& namespaces, - const string& filepath ) { - // open the default column family so that we can retrieve information about - // each index, which is needed in order to open the index column families - rocksdb::DB* db; - rocksdb::Status status = rocksdb::DB::OpenForReadOnly( dbOptions(), filepath, &db ); - boost::scoped_ptr<rocksdb::DB> dbPtr( db ); - - ROCKS_STATUS_OK( status ); - - map<string, Ordering> indexOrderings; - - // populate indexOrderings - for ( unsigned i = 0; i < namespaces.size(); i++ ) { - const string ns = namespaces[i]; - const size_t sepPos = ns.find( '$' ); - if ( sepPos == string::npos ) { - continue; - } - - // the default family, which we want to ignore, should be caught in the above if - // statement. - invariant( !_isDefaultFamily( ns ) ); - - string collection = ns.substr( 0, sepPos ); - - boost::shared_ptr<Entry>& entry = _entryMap[collection]; - invariant( entry ); - - // Generate the Ordering object for each index, allowing the column families - // representing these indexes to eventually be opened - const string indexName = ns.substr( sepPos + 1 ); - const BSONObj spec = entry->collectionEntry->getOtherIndexSpec( indexName, db ); - const Ordering order = Ordering::make( spec["key"].Obj() ); - - indexOrderings.insert( std::make_pair( indexName, order ) ); - } - - return indexOrderings; - } - - RocksEngine::CfdVector RocksEngine::_createCfds( const std::vector<std::string>& namespaces, - const map<string, Ordering>& indexOrderings ) { - CfdVector families; - - for ( size_t i = 0; i < namespaces.size(); i++ ) { - const std::string ns = namespaces[i]; - const size_t sepPos = ns.find( '$' ); - const bool isIndex = sepPos != string::npos; - - if ( isIndex ) { - rocksdb::ColumnFamilyOptions options = _indexOptions(); - - const string indexName = ns.substr( sepPos + 1 ); - - const map<string, Ordering>::const_iterator it = indexOrderings.find( indexName ); - invariant( it != indexOrderings.end() ); - - const Ordering order = it->second; - options.comparator = RocksSortedDataImpl::newRocksComparator( order ); - - families.push_back( rocksdb::ColumnFamilyDescriptor( ns, options ) ); - - } else if ( _isDefaultFamily( ns ) ) { - families.push_back( rocksdb::ColumnFamilyDescriptor( ns, - rocksdb::ColumnFamilyOptions() ) ); - } else { - families.push_back( rocksdb::ColumnFamilyDescriptor( ns, _collectionOptions() ) ); - } - } - - return families; - } - - void RocksEngine::_createEntries( const CfdVector& families, - const vector<rocksdb::ColumnFamilyHandle*> handles ) { - invariant(_defaultHandle != NULL); - - for ( unsigned i = 0; i < families.size(); i++ ) { - const string ns = families[i].name; - - ROCKS_TRACE << "RocksEngine found ns: " << ns; - - // RocksDB specifies that the default column family must be included in the database. - // We want to ignore this column family. - if ( _isDefaultFamily( ns ) ) { - continue; - } - - string collection = ns; - - const size_t sepPos = ns.find( '$' ); - - const bool isIndex = sepPos != string::npos; - if ( isIndex ) { - collection = ns.substr( 0, sepPos ); - } - - boost::shared_ptr<Entry> entry = _entryMap[collection]; - invariant( entry ); - - if ( isIndex ) { - string indexName = ns.substr( sepPos + 1 ); - ROCKS_TRACE << " got index " << indexName << " for " << collection; - entry->indexNameToCF[indexName].reset( handles[i] ); - - invariant( families[i].options.comparator ); - entry->indexNameToComparator[indexName].reset( families[i].options.comparator ); - } else { - std::string optionsKey = ns + "-options"; - std::string value; - rocksdb::Status status = _db->Get(rocksdb::ReadOptions(), optionsKey, &value); - - ROCKS_STATUS_OK( status ); - BSONObj optionsObj( value.data() ); - - CollectionOptions options; - options.parse( optionsObj ); - - entry->cfHandle.reset( handles[i] ); - if ( options.capped ) { - entry->recordStore.reset(new RocksRecordStore( - ns, - _db.get(), - handles[i], - _defaultHandle, - options.capped, - options.cappedSize ? options.cappedSize : 4096, // default size - options.cappedMaxDocs ? options.cappedMaxDocs : -1) ); - } else { - entry->recordStore.reset( new RocksRecordStore( ns, _db.get(), handles[i], - _defaultHandle ) ); - } - // entry->collectionEntry is set in _createNonIndexCatalogEntries() - invariant( entry->collectionEntry ); - } - } + rocksdb::ColumnFamilyOptions RocksEngine::_indexOptions(const Ordering& order) const { + rocksdb::ColumnFamilyOptions options; + invariant( _collectionComparator.get() ); + options.comparator = RocksSortedDataImpl::newRocksComparator(order); + return options; } Status toMongoStatus( rocksdb::Status s ) { diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h index 62e8a62e480..1094a74db8d 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.h +++ b/src/mongo/db/storage/rocks/rocks_engine.h @@ -34,6 +34,7 @@ #include <list> #include <map> #include <string> +#include <memory> #include <boost/optional.hpp> #include <boost/scoped_ptr.hpp> @@ -44,7 +45,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/bson/ordering.h" -#include "mongo/db/storage/storage_engine.h" +#include "mongo/db/storage/kv/kv_engine.h" #include "mongo/util/string_map.h" namespace rocksdb { @@ -53,126 +54,67 @@ namespace rocksdb { struct ColumnFamilyOptions; class DB; class Comparator; + class Iterator; struct Options; struct ReadOptions; } namespace mongo { - class RocksCollectionCatalogEntry; - class RocksDatabaseCatalogEntry; - class RocksRecordStore; - struct CollectionOptions; - /** - * Since we have one DB for the entire server, the RocksEngine is going to hold all state. - * DatabaseCatalogEntry will just be a thin slice over the engine. - */ - class RocksEngine : public StorageEngine { + class RocksEngine : public KVEngine { MONGO_DISALLOW_COPYING( RocksEngine ); public: RocksEngine( const std::string& path ); virtual ~RocksEngine(); - virtual RecoveryUnit* newRecoveryUnit( OperationContext* opCtx ); - - virtual void listDatabases( std::vector<std::string>* out ) const; - - virtual DatabaseCatalogEntry* getDatabaseCatalogEntry( OperationContext* opCtx, - const StringData& db ); + virtual RecoveryUnit* newRecoveryUnit() override; - virtual bool supportsDocLocking() const { return false; } + virtual Status createRecordStore(OperationContext* opCtx, const StringData& ident, + const CollectionOptions& options) override; - /** - * @return number of files flushed - */ - virtual int flushAllFiles( bool sync ); + virtual RecordStore* getRecordStore(OperationContext* opCtx, const StringData& ns, + const StringData& ident, + const CollectionOptions& options) override; - virtual Status repairDatabase( OperationContext* tnx, - const std::string& dbName, - bool preserveClonedFilesOnFailure = false, - bool backupOriginalFiles = false ); + virtual Status dropRecordStore(OperationContext* opCtx, const StringData& ident) override; - virtual void cleanShutdown(OperationContext* txn); + virtual Status createSortedDataInterface(OperationContext* opCtx, const StringData& ident, + const IndexDescriptor* desc) override; - virtual Status closeDatabase( OperationContext* txn, const StringData& db ); + virtual SortedDataInterface* getSortedDataInterface(OperationContext* opCtx, + const StringData& ident, + const IndexDescriptor* desc) override; - virtual Status dropDatabase( OperationContext* txn, const StringData& db ); + virtual Status dropSortedDataInterface(OperationContext* opCtx, + const StringData& ident) override; // rocks specific api rocksdb::DB* getDB() { return _db.get(); } const rocksdb::DB* getDB() const { return _db.get(); } - void getCollectionNamespaces( const StringData& dbName, std::list<std::string>* out ) const; - - Status createCollection( OperationContext* txn, - const StringData& ns, - const CollectionOptions& options ); - - Status dropCollection( OperationContext* opCtx, const StringData& ns ); - - /** - * Will create if doesn't exist. The collection has to exist first, though. - * The ordering argument is only used if the column family does not exist. If the - * column family does not exist, then ordering must be a non-empty optional, containing - * the Ordering for the index. - */ - rocksdb::ColumnFamilyHandle* getIndexColumnFamily( const StringData& ns, - const StringData& indexName, - const Ordering& order ); - - /** - * Will return NULL if doesn't exist. - */ - rocksdb::ColumnFamilyHandle* getIndexColumnFamilyNoCreate(const StringData& ns, - const StringData& indexName ); - /** - * Completely removes a column family. Input pointer is invalid after calling - */ - void removeColumnFamily( rocksdb::ColumnFamilyHandle** cfh, - const StringData& indexName, - const StringData& ns ); - /** * Returns a ReadOptions object that uses the snapshot contained in opCtx */ static rocksdb::ReadOptions readOptionsWithSnapshot( OperationContext* opCtx ); - struct Entry { - boost::scoped_ptr<rocksdb::ColumnFamilyHandle> cfHandle; - boost::scoped_ptr<RocksCollectionCatalogEntry> collectionEntry; - boost::scoped_ptr<RocksRecordStore> recordStore; - // These ColumnFamilyHandles must be deleted by removeIndex - StringMap<boost::shared_ptr<rocksdb::ColumnFamilyHandle>> indexNameToCF; - StringMap<boost::shared_ptr<const rocksdb::Comparator>> indexNameToComparator; - }; - - Entry* getEntry( const StringData& ns ); - const Entry* getEntry( const StringData& ns ) const; - - typedef std::vector<rocksdb::ColumnFamilyDescriptor> CfdVector; - static rocksdb::Options dbOptions(); private: - rocksdb::ColumnFamilyHandle* _getIndexColumnFamilyNoCreate_inlock( - const StringData& ns, - const StringData& indexName); + bool _existsColumnFamily(const StringData& ident); + Status _createColumnFamily(const rocksdb::ColumnFamilyOptions& options, + const StringData& ident); + Status _dropColumnFamily(const StringData& ident); + boost::shared_ptr<rocksdb::ColumnFamilyHandle> _getColumnFamily(const StringData& ident); - rocksdb::ColumnFamilyHandle* _createIndexColumnFamily_inlock( const StringData& ns, - const StringData& indexName, - const Ordering& order ); - - void _removeColumnFamily_inlock( rocksdb::ColumnFamilyHandle** cfh, - const StringData& indexName, - const StringData& ns ); - - Status _dropCollection_inlock( OperationContext* opCtx, const StringData& ns ); + std::unordered_map<std::string, Ordering> _loadOrderingMetaData(rocksdb::Iterator* itr); + std::set<std::string> _loadCollections(rocksdb::Iterator* itr); + std::vector<std::string> _loadColumnFamilies(); rocksdb::ColumnFamilyOptions _collectionOptions() const; - rocksdb::ColumnFamilyOptions _indexOptions() const; + rocksdb::ColumnFamilyOptions _indexOptions(const Ordering& order) const; std::string _path; boost::scoped_ptr<rocksdb::DB> _db; @@ -181,56 +123,9 @@ namespace mongo { // Default column family is owned by the rocksdb::DB instance. rocksdb::ColumnFamilyHandle* _defaultHandle; - typedef StringMap< boost::shared_ptr<Entry> > EntryMap; - mutable boost::mutex _entryMapMutex; - EntryMap _entryMap; - - typedef StringMap<boost::shared_ptr<RocksDatabaseCatalogEntry> > DbCatalogMap; - // illegal to hold at the same time as _entryMapMutex - boost::mutex _dbCatalogMapMutex; - DbCatalogMap _dbCatalogMap; - - // private methods that should usually only be called from the RocksEngine constructor - - // RocksDB necessitates opening a default column family. This method exists to identify - // that column family so that it can be ignored. - bool _isDefaultFamily( const string& name ); - - // See larger comment in .cpp for why this is necessary - void _createNonIndexCatalogEntries( const std::vector<std::string>& families ); - - /** - * Return a vector containing the name of every column family in the database - */ - std::vector<std::string> _listFamilyNames( std::string filepath ); - - /** - * @param namespaces a vector containing all the namespaces in this database. - * @param metaDataCfds a vector of the column family descriptors for every column family - * in the database representing metadata. - */ - std::map<std::string, Ordering> _createIndexOrderings( - const std::vector<string>& namespaces, - const std::string& filepath ); - - /** - * @param namespaces a vector containing all the namespaces in this database - */ - CfdVector _createCfds ( const std::vector<std::string>& namespaces, - const std::map<std::string, Ordering>& indexOrderings ); - - /** - * Create a complete Entry object in _entryMap for every ColumnFamilyDescriptor. Assumes - * that if the collectionEntry field should be initialized, that is already has been prior - * to this function call. - * - * @param families A vector of column family descriptors for every column family in the - * database - * @param handles A vector of column family handles for every column family in the - * database - */ - void _createEntries( const CfdVector& families, - const std::vector<rocksdb::ColumnFamilyHandle*> handles ); + mutable boost::mutex _identColumnFamilyMapMutex; + typedef StringMap<boost::shared_ptr<rocksdb::ColumnFamilyHandle> > IdentColumnFamilyMap; + IdentColumnFamilyMap _identColumnFamilyMap; }; Status toMongoStatus( rocksdb::Status s ); diff --git a/src/mongo/db/storage/rocks/rocks_engine_test.cpp b/src/mongo/db/storage/rocks/rocks_engine_test.cpp index 8cfa8e550a7..a07d2078d17 100644 --- a/src/mongo/db/storage/rocks/rocks_engine_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_engine_test.cpp @@ -1,311 +1,69 @@ -// rocks_engine_test.cpp +// kv_engine_test_harness.h /** -* Copyright (C) 2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #include <boost/filesystem/operations.hpp> +#include <boost/scoped_ptr.hpp> +#include <rocksdb/comparator.h> #include <rocksdb/db.h> -#include <rocksdb/slice.h> #include <rocksdb/options.h> +#include <rocksdb/slice.h> -#include "mongo/db/catalog/collection_options.h" -#include "mongo/db/index/index_descriptor.h" -#include "mongo/db/operation_context_noop.h" -#include "mongo/db/storage/rocks/rocks_collection_catalog_entry.h" +#include "mongo/db/storage/kv/kv_engine.h" +#include "mongo/db/storage/kv/kv_engine_test_harness.h" #include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/rocks/rocks_record_store.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" -#include "mongo/unittest/unittest.h" - -using namespace mongo; +#include "mongo/unittest/temp_dir.h" namespace mongo { - - class MyOperationContext : public OperationContextNoop { + class RocksEngineHarnessHelper : public KVHarnessHelper { public: - MyOperationContext(RocksEngine* engine) - : OperationContextNoop(new RocksRecoveryUnit(engine->getDB(), false)) {} - }; - - TEST( RocksEngineTest, Start1 ) { - std::string path = "/tmp/mongo-rocks-engine-test"; - boost::filesystem::remove_all( path ); - { - RocksEngine engine( path ); - } - - { - RocksEngine engine( path ); - } - - } - - TEST( RocksEngineTest, CreateDirect1 ) { - std::string path = "/tmp/mongo-rocks-engine-test"; - boost::filesystem::remove_all( path ); - RocksEngine engine( path ); - - { - MyOperationContext opCtx( &engine ); - Status status = engine.createCollection( &opCtx, - "test.foo", - CollectionOptions() ); - ASSERT_OK( status ); - } - - RocksRecordStore* rs = engine.getEntry( "test.foo" )->recordStore.get(); - string s = "eliot was here"; - - { - MyOperationContext opCtx( &engine ); - DiskLoc loc; - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs->insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - } - - ASSERT_EQUALS( s, rs->dataFor( &opCtx, loc ).data() ); - } - } - - TEST( RocksEngineTest, DropDirect1 ) { - std::string path = "/tmp/mongo-rocks-engine-test"; - boost::filesystem::remove_all( path ); - RocksEngine engine( path ); - - { - MyOperationContext opCtx( &engine ); - Status status = engine.createCollection( &opCtx, - "test.foo", - CollectionOptions() ); - ASSERT_OK( status ); - } - - { - MyOperationContext opCtx( &engine ); - Status status = engine.createCollection( &opCtx, - "test.bar", - CollectionOptions() ); - ASSERT_OK( status ); - } - - { - MyOperationContext opCtx( &engine ); - Status status = engine.createCollection( &opCtx, - "silly.bar", - CollectionOptions() ); - ASSERT_OK( status ); - } - - { - std::list<std::string> names; - engine.getCollectionNamespaces( "test", &names ); - ASSERT_EQUALS( 2U, names.size() ); - } - - { - std::list<std::string> names; - engine.getCollectionNamespaces( "silly", &names ); - ASSERT_EQUALS( 1U, names.size() ); - } - - { - MyOperationContext opCtx( &engine ); - Status status = engine.dropCollection( &opCtx, - "test.foo" ); - ASSERT_OK( status ); + RocksEngineHarnessHelper() : _dbpath("mongo-rocks-engine-test") { + boost::filesystem::remove_all(_dbpath.path()); + _engine.reset(new RocksEngine(_dbpath.path())); } - { - std::list<std::string> names; - engine.getCollectionNamespaces( "test", &names ); - ASSERT_EQUALS( 1U, names.size() ); - ASSERT_EQUALS( names.front(), "test.bar" ); - } - - { - MyOperationContext opCtx( &engine ); - Status status = engine.dropCollection( &opCtx, - "test.foo" ); - ASSERT_NOT_OK( status ); - } - } - - TEST( RocksCollectionEntryTest, MetaDataRoundTrip ) { - RocksCollectionCatalogEntry::MetaData md; - md.ns = "test.foo"; - md.indexes.push_back( RocksCollectionCatalogEntry::IndexMetaData( BSON( "a" << 1 ), - true, - DiskLoc( 5, 17 ), - false ) ); - - BSONObj a = md.toBSON(); - ASSERT_EQUALS( 3, a.nFields() ); - ASSERT_EQUALS( string("test.foo"), a["ns"].String() ); - ASSERT_EQUALS( BSONObj(), a["options"].Obj() ); - BSONObj indexes = a["indexes"].Obj(); - ASSERT_EQUALS( 1, indexes.nFields() ); - BSONObj idx = indexes["0"].Obj(); - ASSERT_EQUALS( 5, idx.nFields() ); - ASSERT_EQUALS( BSON( "a" << 1 ), idx["spec"].Obj() ); - ASSERT( idx["ready"].trueValue() ); - ASSERT( !idx["multikey"].trueValue() ); - ASSERT_EQUALS( 5, idx["head_a"].Int() ); - ASSERT_EQUALS( 17, idx["head_b"].Int() ); - - RocksCollectionCatalogEntry::MetaData md2; - md2.parse( a ); - ASSERT_EQUALS( md.indexes[0].head, md2.indexes[0].head ); - ASSERT_EQUALS( a, md2.toBSON() ); - } - - TEST( RocksCollectionEntryTest, IndexCreateAndMod1 ) { - std::string path = "/tmp/mongo-rocks-engine-test"; - boost::filesystem::remove_all( path ); - RocksEngine engine( path ); - - { - RocksCollectionCatalogEntry coll( &engine, "test.foo" ); - coll.createMetaData(); - { - MyOperationContext opCtx( &engine ); - ASSERT_EQUALS( 0, coll.getTotalIndexCount(&opCtx) ); - } - - BSONObj spec = BSON( "key" << BSON( "a" << 1 ) << - "name" << "silly" << - "ns" << "test.foo" ); - - IndexDescriptor desc( NULL, "", spec ); - - { - MyOperationContext opCtx( &engine ); - Status status = coll.prepareForIndexBuild( &opCtx, &desc ); - ASSERT_OK( status ); - } - - { - MyOperationContext opCtx( &engine ); - ASSERT_EQUALS( 1, coll.getTotalIndexCount(&opCtx) ); - ASSERT_EQUALS( 0, coll.getCompletedIndexCount(&opCtx) ); - ASSERT( !coll.isIndexReady( &opCtx, "silly" ) ); - } - - { - MyOperationContext opCtx( &engine ); - coll.indexBuildSuccess( &opCtx, "silly" ); - } - - { - MyOperationContext opCtx( &engine ); - ASSERT_EQUALS( 1, coll.getTotalIndexCount(&opCtx) ); - ASSERT_EQUALS( 1, coll.getCompletedIndexCount(&opCtx) ); - ASSERT( coll.isIndexReady( &opCtx, "silly" ) ); - } - - { - MyOperationContext opCtx( &engine ); - ASSERT_EQUALS( DiskLoc(), coll.getIndexHead( &opCtx, "silly" ) ); - } + virtual ~RocksEngineHarnessHelper() = default; - { - MyOperationContext opCtx( &engine ); - coll.setIndexHead( &opCtx, "silly", DiskLoc( 123,321 ) ); - } + virtual KVEngine* getEngine() { return _engine.get(); } - { - MyOperationContext opCtx( &engine ); - ASSERT_EQUALS( DiskLoc(123, 321), coll.getIndexHead( &opCtx, "silly" ) ); - ASSERT( !coll.isIndexMultikey( &opCtx, "silly" ) ); - } - - { - MyOperationContext opCtx( &engine ); - coll.setIndexIsMultikey( &opCtx, "silly", true ); - } - - { - MyOperationContext opCtx( &engine ); - ASSERT( coll.isIndexMultikey( &opCtx, "silly" ) ); - } - - } - } - - TEST( RocksEngineTest, Restart1 ) { - std::string path = "/tmp/mongo-rocks-engine-test"; - boost::filesystem::remove_all( path ); - - string s = "eliot was here"; - DiskLoc loc; - - { - RocksEngine engine( path ); - - { - MyOperationContext opCtx( &engine ); - WriteUnitOfWork uow( &opCtx ); - Status status = engine.createCollection( &opCtx, - "test.foo", - CollectionOptions() ); - ASSERT_OK( status ); - uow.commit(); - } - - RocksRecordStore* rs = engine.getEntry( "test.foo" )->recordStore.get(); - - { - MyOperationContext opCtx( &engine ); - - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs->insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - uow.commit(); - } - - ASSERT_EQUALS( s, rs->dataFor( &opCtx, loc ).data() ); - engine.cleanShutdown( &opCtx ); - } + virtual KVEngine* restartEngine() { + _engine.reset(nullptr); + _engine.reset(new RocksEngine(_dbpath.path())); + return _engine.get(); } - { - RocksEngine engine(path); - RocksRecordStore* rs = engine.getEntry("test.foo")->recordStore.get(); - MyOperationContext opCtx(&engine); - ASSERT_EQUALS(s, rs->dataFor(&opCtx, loc).data()); - } + private: + unittest::TempDir _dbpath; - } + boost::scoped_ptr<RocksEngine> _engine; + }; + KVHarnessHelper* KVHarnessHelper::create() { return new RocksEngineHarnessHelper(); } } diff --git a/src/mongo/db/storage/rocks/rocks_init.cpp b/src/mongo/db/storage/rocks/rocks_init.cpp index 14da69ade87..922ab30ed04 100644 --- a/src/mongo/db/storage/rocks/rocks_init.cpp +++ b/src/mongo/db/storage/rocks/rocks_init.cpp @@ -34,6 +34,7 @@ #include "mongo/base/init.h" #include "mongo/db/global_environment_experiment.h" #include "mongo/db/storage_options.h" +#include "mongo/db/storage/kv/kv_storage_engine.h" namespace mongo { @@ -42,7 +43,7 @@ namespace mongo { public: virtual ~RocksFactory(){} virtual StorageEngine* create( const StorageGlobalParams& params ) const { - return new RocksEngine( params.dbpath ); + return new KVStorageEngine(new RocksEngine(params.dbpath)); } }; } // namespace diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp index 0c527ebb0b2..168cee282aa 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp @@ -29,9 +29,9 @@ * it in the license file. */ -#include "mongo/db/operation_context.h" #include "mongo/db/storage/rocks/rocks_record_store.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" + +#include <memory> #include <rocksdb/comparator.h> #include <rocksdb/db.h> @@ -39,32 +39,29 @@ #include <rocksdb/slice.h> #include <rocksdb/utilities/write_batch_with_index.h> +#include "mongo/db/operation_context.h" +#include "mongo/db/storage/rocks/rocks_recovery_unit.h" #include "mongo/util/log.h" +#include "mongo/util/timer.h" namespace mongo { - RocksRecordStore::RocksRecordStore( const StringData& ns, - rocksdb::DB* db, // not owned here - rocksdb::ColumnFamilyHandle* columnFamily, - rocksdb::ColumnFamilyHandle* metadataColumnFamily, - bool isCapped, - int64_t cappedMaxSize, - int64_t cappedMaxDocs, - CappedDocumentDeleteCallback* cappedDeleteCallback ) - : RecordStore( ns ), - _db( db ), - _columnFamily( columnFamily ), - _metadataColumnFamily( metadataColumnFamily ), - _isCapped( isCapped ), - _cappedMaxSize( cappedMaxSize ), - _cappedMaxDocs( cappedMaxDocs ), - _cappedDeleteCallback( cappedDeleteCallback ), - _dataSizeKey( ns.toString() + "-dataSize" ), - _numRecordsKey( ns.toString() + "-numRecords" ) { + RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id, + rocksdb::DB* db, // not owned here + boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, + bool isCapped, int64_t cappedMaxSize, int64_t cappedMaxDocs, + CappedDocumentDeleteCallback* cappedDeleteCallback) + : RecordStore(ns), + _db(db), + _columnFamily(columnFamily), + _isCapped(isCapped), + _cappedMaxSize(cappedMaxSize), + _cappedMaxDocs(cappedMaxDocs), + _cappedDeleteCallback(cappedDeleteCallback), + _dataSizeKey("datasize-" + id.toString()), + _numRecordsKey("numrecords-" + id.toString()) { invariant( _db ); invariant( _columnFamily ); - invariant( _metadataColumnFamily ); - invariant( _columnFamily != _metadataColumnFamily ); if (_isCapped) { invariant(_cappedMaxSize > 0); @@ -76,9 +73,8 @@ namespace mongo { } // Get next id - // XXX not using a Snapshot here - boost::scoped_ptr<rocksdb::Iterator> iter( db->NewIterator( _readOptions(), - columnFamily ) ); + boost::scoped_ptr<rocksdb::Iterator> iter( + db->NewIterator(_readOptions(), columnFamily.get())); iter->SeekToLast(); if (iter->Valid()) { rocksdb::Slice lastSlice = iter->key(); @@ -94,10 +90,7 @@ namespace mongo { std::string value; bool metadataPresent = true; // XXX not using a Snapshot here - if (!_db->Get( _readOptions(), - _metadataColumnFamily, - rocksdb::Slice( _numRecordsKey ), - &value ).ok()) { + if (!_db->Get(_readOptions(), rocksdb::Slice(_numRecordsKey), &value).ok()) { _numRecords = 0; metadataPresent = false; } @@ -106,10 +99,7 @@ namespace mongo { } // XXX not using a Snapshot here - if (!_db->Get( _readOptions(), - _metadataColumnFamily, - rocksdb::Slice( _dataSizeKey ), - &value ).ok()) { + if (!_db->Get(_readOptions(), rocksdb::Slice(_dataSizeKey), &value).ok()) { _dataSize = 0; invariant(!metadataPresent); } @@ -124,56 +114,22 @@ namespace mongo { int infoLevel ) const { uint64_t storageSize; rocksdb::Range wholeRange( _makeKey( minDiskLoc ), _makeKey( maxDiskLoc ) ); - _db->GetApproximateSizes( _columnFamily, &wholeRange, 1, &storageSize); + _db->GetApproximateSizes(_columnFamily.get(), &wholeRange, 1, &storageSize); return static_cast<int64_t>( storageSize ); } RecordData RocksRecordStore::dataFor( OperationContext* txn, const DiskLoc& loc) const { - rocksdb::Slice value; - - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); - auto key = _makeKey(loc); - boost::shared_array<char> data; - - boost::scoped_ptr<rocksdb::WBWIIterator> wb_iterator( - ru->writeBatch()->NewIterator(_columnFamily)); - wb_iterator->Seek(key); - if (wb_iterator->Valid() && wb_iterator->Entry().key == key) { - auto& entry = wb_iterator->Entry(); - if (entry.type == rocksdb::WriteType::kDeleteRecord) { - return RecordData(nullptr, 0); - } - data.reset(new char[entry.value.size()]); - memcpy(data.get(), entry.value.data(), entry.value.size()); - value = rocksdb::Slice(data.get(), entry.value.size()); - } else { - // TODO investigate using cursor API to get a Slice and avoid double copying. - std::string value_storage; - auto status = _db->Get(_readOptions(txn), _columnFamily, _makeKey(loc), &value_storage); - if (!status.ok()) { - if (status.IsNotFound()) { - return RecordData(nullptr, 0); - } else { - log() << "rocks Get failed, blowing up: " << status.ToString(); - invariant(false); - } - } - data.reset(new char[value_storage.size()]); - memcpy(data.get(), value_storage.data(), value_storage.size()); - value = rocksdb::Slice(data.get(), value_storage.size()); - } - - return RecordData(value.data(), value.size(), data); + return _getDataFor(_db, _columnFamily.get(), txn, loc); } void RocksRecordStore::deleteRecord( OperationContext* txn, const DiskLoc& dl ) { - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); std::string oldValue; - _db->Get( _readOptions( txn ), _columnFamily, _makeKey( dl ), &oldValue ); + _db->Get(_readOptions(txn), _columnFamily.get(), _makeKey(dl), &oldValue); int oldLength = oldValue.size(); - ru->writeBatch()->Delete( _columnFamily, _makeKey( dl ) ); + ru->writeBatch()->Delete(_columnFamily.get(), _makeKey(dl)); _changeNumRecords(txn, false); _increaseDataSize(txn, -oldLength); @@ -195,9 +151,8 @@ namespace mongo { void RocksRecordStore::cappedDeleteAsNeeded(OperationContext* txn) { if (!cappedAndNeedDelete()) return; - // This persistent iterator is necessary since you can't read your own writes - boost::scoped_ptr<rocksdb::Iterator> iter( _db->NewIterator( _readOptions( txn ), - _columnFamily ) ); + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_columnFamily.get())); iter->SeekToFirst(); // XXX TODO there is a bug here where if the size of the write batch exceeds the cap size @@ -231,11 +186,11 @@ namespace mongo { "object to insert exceeds cappedMaxSize" ); } - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); DiskLoc loc = _nextId(); - ru->writeBatch()->Put( _columnFamily, _makeKey( loc ), rocksdb::Slice( data, len ) ); + ru->writeBatch()->Put(_columnFamily.get(), _makeKey(loc), rocksdb::Slice(data, len)); _changeNumRecords( txn, true ); _increaseDataSize( txn, len ); @@ -261,14 +216,10 @@ namespace mongo { int len, bool enforceQuota, UpdateMoveNotifier* notifier ) { - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); std::string old_value; - // XXX Be sure to also first query the write batch once Facebook implements that - rocksdb::Status status = _db->Get( _readOptions( txn ), - _columnFamily, - _makeKey( loc ), - &old_value ); + auto status = ru->Get(_columnFamily.get(), _makeKey(loc), &old_value); if ( !status.ok() ) { return StatusWith<DiskLoc>( ErrorCodes::InternalError, status.ToString() ); @@ -276,7 +227,7 @@ namespace mongo { int old_length = old_value.size(); - ru->writeBatch()->Put( _columnFamily, _makeKey( loc ), rocksdb::Slice( data, len ) ); + ru->writeBatch()->Put(_columnFamily.get(), _makeKey(loc), rocksdb::Slice(data, len)); _increaseDataSize(txn, len - old_length); @@ -289,14 +240,14 @@ namespace mongo { const DiskLoc& loc, const char* damangeSource, const mutablebson::DamageVector& damages ) { - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); rocksdb::Slice key = _makeKey( loc ); // get original value std::string value; rocksdb::Status status; - status = _db->Get( _readOptions( txn ), _columnFamily, key, &value ); + status = _db->Get(_readOptions(txn), _columnFamily.get(), key, &value); if ( !status.ok() ) { if ( status.IsNotFound() ) @@ -316,7 +267,7 @@ namespace mongo { } // write back - ru->writeBatch()->Put( _columnFamily, key, value ); + ru->writeBatch()->Put(_columnFamily.get(), key, value); return Status::OK(); } @@ -326,9 +277,7 @@ namespace mongo { bool tailable, const CollectionScanParams::Direction& dir ) const { - invariant( !tailable ); - - return new Iterator( txn, this, dir, start ); + return new Iterator(txn, _db, _columnFamily, tailable, dir, start); } @@ -360,7 +309,7 @@ namespace mongo { RecordStoreCompactAdaptor* adaptor, const CompactOptions* options, CompactStats* stats ) { - rocksdb::Status status = _db->CompactRange( _columnFamily, NULL, NULL ); + rocksdb::Status status = _db->CompactRange(_columnFamily.get(), NULL, NULL); if ( status.ok() ) return Status::OK(); else @@ -380,16 +329,18 @@ namespace mongo { boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) ); while( !iter->isEOF() ) { numRecords++; - RecordData data = dataFor( txn, iter->curr() ); - size_t dataSize; - const Status status = adaptor->validate( data, &dataSize ); - if (!status.isOK()) { - results->valid = false; - if ( invalidObject ) { - results->errors.push_back("invalid object detected (see logs)"); + if (full) { + RecordData data = dataFor(txn, iter->curr()); + size_t dataSize; + const Status status = adaptor->validate(data, &dataSize); + if (!status.isOK()) { + results->valid = false; + if (invalidObject) { + results->errors.push_back("invalid object detected (see logs)"); + } + invalidObject = true; + log() << "Invalid object detected in " << _ns << ": " << status.reason(); } - invalidObject = true; - log() << "Invalid object detected in " << _ns << ": " << status.reason(); } iter->getNext(); } @@ -410,14 +361,25 @@ namespace mongo { result->appendIntOrLL("max", _cappedMaxDocs); result->appendIntOrLL("maxSize", _cappedMaxSize); } - bool valid = _db->GetProperty( _columnFamily, "rocksdb.stats", &statsString ); + bool valid = _db->GetProperty(_columnFamily.get(), "rocksdb.stats", &statsString); invariant( valid ); result->append( "stats", statsString ); } + Status RocksRecordStore::touch(OperationContext* txn, BSONObjBuilder* output) const { + Timer t; + boost::scoped_ptr<rocksdb::Iterator> itr( + _db->NewIterator(_readOptions(), _columnFamily.get())); + itr->SeekToFirst(); + for (; itr->Valid(); itr->Next()) { + invariant(itr->status().ok()); + } + invariant(itr->status().ok()); - // AFB: is there a way to force column families to be cached in rocks? - Status RocksRecordStore::touch( OperationContext* txn, BSONObjBuilder* output ) const { + if (output) { + output->append("numRanges", 1); + output->append("millis", t.millis()); + } return Status::OK(); } @@ -429,7 +391,7 @@ namespace mongo { return Status::OK(); } - return Status( ErrorCodes::BadValue, "Invalid option: " + optionName ); + return Status(ErrorCodes::InvalidOptions, "Invalid option: " + optionName); } namespace { @@ -485,19 +447,19 @@ namespace mongo { } void RocksRecordStore::dropRsMetaData( OperationContext* opCtx ) { - RocksRecoveryUnit* ru = _getRecoveryUnit( opCtx ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( opCtx ); boost::mutex::scoped_lock dataSizeLk( _dataSizeLock ); - ru->writeBatch()->Delete( _metadataColumnFamily, _dataSizeKey ); + ru->writeBatch()->Delete(_dataSizeKey); boost::mutex::scoped_lock numRecordsLk( _numRecordsLock ); - ru->writeBatch()->Delete( _metadataColumnFamily, _numRecordsKey ); + ru->writeBatch()->Delete(_numRecordsKey); } - rocksdb::ReadOptions RocksRecordStore::_readOptions( OperationContext* opCtx ) const { + rocksdb::ReadOptions RocksRecordStore::_readOptions(OperationContext* opCtx) { rocksdb::ReadOptions options; if ( opCtx ) { - options.snapshot = _getRecoveryUnit( opCtx )->snapshot(); + options.snapshot = RocksRecoveryUnit::getRocksRecoveryUnit( opCtx )->snapshot(); } return options; } @@ -511,18 +473,38 @@ namespace mongo { return loc; } - rocksdb::Slice RocksRecordStore::_makeKey( const DiskLoc& loc ) { - return rocksdb::Slice( reinterpret_cast<const char*>( &loc ), sizeof( loc ) ); - } - - RocksRecoveryUnit* RocksRecordStore::_getRecoveryUnit( OperationContext* opCtx ) { - return dynamic_cast<RocksRecoveryUnit*>( opCtx->recoveryUnit() ); + rocksdb::Slice RocksRecordStore::_makeKey(const DiskLoc& loc) { + return rocksdb::Slice(reinterpret_cast<const char*>(&loc), sizeof(loc)); } DiskLoc RocksRecordStore::_makeDiskLoc( const rocksdb::Slice& slice ) { return reinterpret_cast<const DiskLoc*>( slice.data() )[0]; } + RecordData RocksRecordStore::_getDataFor(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, + OperationContext* txn, const DiskLoc& loc) { + rocksdb::Slice value; + + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + auto key = _makeKey(loc); + boost::shared_array<char> data; + + std::string value_storage; + auto status = ru->Get(cf, _makeKey(loc), &value_storage); + if (!status.ok()) { + if (status.IsNotFound()) { + return RecordData(nullptr, 0); + } else { + log() << "rocks Get failed, blowing up: " << status.ToString(); + invariant(false); + } + } + data.reset(new char[value_storage.size()]); + memcpy(data.get(), value_storage.data(), value_storage.size()); + value = rocksdb::Slice(data.get(), value_storage.size()); + return RecordData(value.data(), value.size(), data); + } + // XXX make sure these work with rollbacks (I don't think they will) void RocksRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) { boost::mutex::scoped_lock lk( _numRecordsLock ); @@ -533,12 +515,11 @@ namespace mongo { else { _numRecords--; } - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); const char* nr_ptr = reinterpret_cast<char*>( &_numRecords ); - ru->writeBatch()->Put( _metadataColumnFamily, - rocksdb::Slice( _numRecordsKey ), - rocksdb::Slice( nr_ptr, sizeof(long long) ) ); + ru->writeBatch()->Put(rocksdb::Slice(_numRecordsKey), + rocksdb::Slice(nr_ptr, sizeof(long long))); } @@ -546,43 +527,28 @@ namespace mongo { boost::mutex::scoped_lock lk( _dataSizeLock ); _dataSize += amount; - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); const char* ds_ptr = reinterpret_cast<char*>( &_dataSize ); - ru->writeBatch()->Put( _metadataColumnFamily, - rocksdb::Slice( _dataSizeKey ), - rocksdb::Slice( ds_ptr, sizeof(long long) ) ); + ru->writeBatch()->Put(rocksdb::Slice(_dataSizeKey), + rocksdb::Slice(ds_ptr, sizeof(long long))); } // -------- - RocksRecordStore::Iterator::Iterator( OperationContext* txn, - const RocksRecordStore* rs, - const CollectionScanParams::Direction& dir, - const DiskLoc& start ) - : _txn( txn ), - _rs( rs ), - _dir( dir ), - _reseekKeyValid( false ), - // XXX not using a snapshot here - _iterator( _rs->_db->NewIterator( rs->_readOptions(), rs->_columnFamily ) ) { - if (start.isNull()) { - if ( _forward() ) - _iterator->SeekToFirst(); - else - _iterator->SeekToLast(); - } - else { - _iterator->Seek( rs->_makeKey( start ) ); + RocksRecordStore::Iterator::Iterator( + OperationContext* txn, rocksdb::DB* db, + boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, bool tailable, + const CollectionScanParams::Direction& dir, const DiskLoc& start) + : _txn(!tailable ? txn : nullptr), + _db(db), + _cf(columnFamily), + _tailable(tailable), + _dir(dir), + _eof(true), + _iterator(RocksRecoveryUnit::getRocksRecoveryUnit(txn)->NewIterator(_cf.get())) { - if ( !_forward() && !_iterator->Valid() ) - _iterator->SeekToLast(); - else if ( !_forward() && _iterator->Valid() && - _makeDiskLoc( _iterator->key() ) != start ) - _iterator->Prev(); - } - - _checkStatus(); + _locate(start); } void RocksRecordStore::Iterator::_checkStatus() { @@ -592,29 +558,35 @@ namespace mongo { } bool RocksRecordStore::Iterator::isEOF() { - return !_iterator || !_iterator->Valid(); + return _eof; } DiskLoc RocksRecordStore::Iterator::curr() { - if ( !_iterator->Valid() ) + if (_eof) { return DiskLoc(); + } - rocksdb::Slice slice = _iterator->key(); - return _makeDiskLoc( slice ); + return _curr; } DiskLoc RocksRecordStore::Iterator::getNext() { - if ( !_iterator->Valid() ) { + if (_eof) { return DiskLoc(); } - DiskLoc toReturn = curr(); + DiskLoc toReturn = _curr; if ( _forward() ) _iterator->Next(); else _iterator->Prev(); + if (_iterator->Valid()) { + _curr = _decodeCurr(); + } else { + _eof = true; + // we leave _curr as it is on purpose + } return toReturn; } @@ -623,36 +595,74 @@ namespace mongo { } void RocksRecordStore::Iterator::saveState() { - if ( !_iterator ) { - return; - } - if ( _iterator->Valid() ) { - _reseekKey = _iterator->key().ToString(); - _reseekKeyValid = true; - } + _iterator.reset(); } + // XXX restoring state with tailable cursor will invalidate the snapshot stored inside of + // OperationContext. It is important that while restoring state, nobody else is using the + // OperationContext (i.e. only a single restoreState is called on a tailable cursor with a + // single OperationContext) bool RocksRecordStore::Iterator::restoreState(OperationContext* txn) { + if (_tailable) { + // we want to read new data if the iterator is tailable + RocksRecoveryUnit::getRocksRecoveryUnit(txn)->releaseSnapshot(); + } _txn = txn; - if ( !_reseekKeyValid ) { - _iterator.reset( NULL ); - return true; + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + _iterator.reset(ru->NewIterator(_cf.get())); + bool previousEOF = _eof; + DiskLoc previousCurr = _curr; + _locate(_curr); + // if _curr != previousCurr that means that _curr has been deleted, so we don't need to + // advance it, because we're already on the next document by Seek-ing + if (previousEOF && _curr == previousCurr) { + getNext(); } - _iterator.reset( _rs->_db->NewIterator( _rs->_readOptions(), - _rs->_columnFamily ) ); - _checkStatus(); - _iterator->Seek( _reseekKey ); - _checkStatus(); - _reseekKeyValid = false; - return true; } RecordData RocksRecordStore::Iterator::dataFor( const DiskLoc& loc ) const { - return _rs->dataFor( _txn, loc ); + return RocksRecordStore::_getDataFor(_db, _cf.get(), _txn, loc); + } + + void RocksRecordStore::Iterator::_locate(const DiskLoc& loc) { + if (_forward()) { + if (loc.isNull()) { + _iterator->SeekToFirst(); + } else { + _iterator->Seek(RocksRecordStore::_makeKey(loc)); + } + _checkStatus(); + } else { // backward iterator + if (loc.isNull()) { + _iterator->SeekToLast(); + } else { + // lower bound on reverse iterator + _iterator->Seek(RocksRecordStore::_makeKey(loc)); + _checkStatus(); + if (!_iterator->Valid()) { + _iterator->SeekToLast(); + } else if (_decodeCurr() != loc) { + _iterator->Prev(); + } + } + _checkStatus(); + } + _eof = !_iterator->Valid(); + if (_eof) { + _curr = loc; + } else { + _curr = _decodeCurr(); + } + } + + DiskLoc RocksRecordStore::Iterator::_decodeCurr() const { + invariant(_iterator && _iterator->Valid()); + return _makeDiskLoc(_iterator->key()); } bool RocksRecordStore::Iterator::_forward() const { return _dir == CollectionScanParams::FORWARD; } + } diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h index 8c191006a34..45d4f100855 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.h +++ b/src/mongo/db/storage/rocks/rocks_record_store.h @@ -29,7 +29,10 @@ * it in the license file. */ +#pragma once + #include <string> +#include <memory> #include <rocksdb/options.h> @@ -50,14 +53,11 @@ namespace mongo { class RocksRecordStore : public RecordStore { public: - RocksRecordStore( const StringData& ns, - rocksdb::DB* db, - rocksdb::ColumnFamilyHandle* columnFamily, - rocksdb::ColumnFamilyHandle* metadataColumnFamily, - bool isCapped = false, - int64_t cappedMaxSize = -1, - int64_t cappedMaxDocs = -1, - CappedDocumentDeleteCallback* cappedDeleteCallback = NULL ); + RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, + boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, + bool isCapped = false, int64_t cappedMaxSize = -1, + int64_t cappedMaxDocs = -1, + CappedDocumentDeleteCallback* cappedDeleteCallback = NULL); virtual ~RocksRecordStore() { } @@ -153,12 +153,12 @@ namespace mongo { static rocksdb::Comparator* newRocksCollectionComparator(); private: + // NOTE: RecordIterator might outlive the RecordStore class Iterator : public RecordIterator { public: - Iterator( OperationContext* txn, - const RocksRecordStore* rs, - const CollectionScanParams::Direction& dir, - const DiskLoc& start ); + Iterator(OperationContext* txn, rocksdb::DB* db, + boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, bool tailable, + const CollectionScanParams::Direction& dir, const DiskLoc& start); virtual bool isEOF(); virtual DiskLoc curr(); @@ -169,14 +169,18 @@ namespace mongo { virtual RecordData dataFor( const DiskLoc& loc ) const; private: + void _locate(const DiskLoc& loc); + DiskLoc _decodeCurr() const; bool _forward() const; void _checkStatus(); OperationContext* _txn; - const RocksRecordStore* _rs; + rocksdb::DB* _db; // not owned + boost::shared_ptr<rocksdb::ColumnFamilyHandle> _cf; + bool _tailable; CollectionScanParams::Direction _dir; - bool _reseekKeyValid; - std::string _reseekKey; + bool _eof; + DiskLoc _curr; boost::scoped_ptr<rocksdb::Iterator> _iterator; }; @@ -184,12 +188,13 @@ namespace mongo { * Returns a new ReadOptions struct, containing the snapshot held in opCtx, if opCtx is not * null */ - rocksdb::ReadOptions _readOptions( OperationContext* opCtx = NULL ) const; - - static RocksRecoveryUnit* _getRecoveryUnit( OperationContext* opCtx ); + static rocksdb::ReadOptions _readOptions(OperationContext* opCtx = NULL); static DiskLoc _makeDiskLoc( const rocksdb::Slice& slice ); + static RecordData _getDataFor(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, + OperationContext* txn, const DiskLoc& loc); + DiskLoc _nextId(); bool cappedAndNeedDelete() const; void cappedDeleteAsNeeded(OperationContext* txn); @@ -201,8 +206,7 @@ namespace mongo { void _increaseDataSize(OperationContext* txn, int amount); rocksdb::DB* _db; // not owned - rocksdb::ColumnFamilyHandle* _columnFamily; // not owned - rocksdb::ColumnFamilyHandle* _metadataColumnFamily; // not owned + boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily; const bool _isCapped; const int64_t _cappedMaxSize; diff --git a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp index 917a70214b6..95206f3f31f 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp @@ -1,35 +1,35 @@ -// rocks_record_store_test.cpp +// rocks_record_store_harness_test.cpp /** -* Copyright (C) 2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #include <memory> +#include <vector> #include <boost/filesystem/operations.hpp> @@ -37,684 +37,47 @@ #include <rocksdb/db.h> #include <rocksdb/options.h> #include <rocksdb/slice.h> -#include <rocksdb/status.h> -#include <rocksdb/comparator.h> -#include <rocksdb/utilities/write_batch_with_index.h> -#include "mongo/db/operation_context.h" -#include "mongo/db/operation_context_noop.h" -#include "mongo/db/storage/rocks/rocks_engine.h" +#include "mongo/db/storage/record_store_test_harness.h" #include "mongo/db/storage/rocks/rocks_record_store.h" #include "mongo/db/storage/rocks/rocks_recovery_unit.h" -#include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" - -using namespace mongo; +#include "mongo/unittest/temp_dir.h" namespace mongo { - class MyOperationContext : public OperationContextNoop { + class RocksRecordStoreHarnessHelper : public HarnessHelper { public: - MyOperationContext(rocksdb::DB* db) - : OperationContextNoop(new RocksRecoveryUnit(db, true)) {} + RocksRecordStoreHarnessHelper() : _tempDir(_testNamespace) { + boost::filesystem::remove_all(_tempDir.path()); + rocksdb::DB* db; + std::vector<rocksdb::ColumnFamilyDescriptor> cfs; + cfs.emplace_back(); + cfs.emplace_back("record_store", rocksdb::ColumnFamilyOptions()); + cfs[1].options.comparator = RocksRecordStore::newRocksCollectionComparator(); + rocksdb::DBOptions db_options; + db_options.create_if_missing = true; + db_options.create_missing_column_families = true; + std::vector<rocksdb::ColumnFamilyHandle*> handles; + auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db); + ASSERT(s.ok()); + _db.reset(db); + delete handles[0]; + _cf.reset(handles[1]); + } + + virtual RecordStore* newNonCappedRecordStore() { + return new RocksRecordStore("foo.bar", "1", _db.get(), _cf); + } + + virtual RecoveryUnit* newRecoveryUnit() { return new RocksRecoveryUnit(_db.get(), true); } + + private: + string _testNamespace = "mongo-rocks-record-store-test"; + unittest::TempDir _tempDir; + boost::scoped_ptr<rocksdb::DB> _db; + boost::shared_ptr<rocksdb::ColumnFamilyHandle> _cf; }; - // to be used in testing - static boost::scoped_ptr<rocksdb::Comparator> _rocksComparator( - RocksRecordStore::newRocksCollectionComparator() ); - - rocksdb::ColumnFamilyOptions getColumnFamilyOptions() { - rocksdb::ColumnFamilyOptions options; - options.comparator = _rocksComparator.get(); - return options; - } - - // the name of the column family that will be used to back the data in all the record stores - // created during tests. - const string columnFamilyName = "myColumnFamily"; - - boost::shared_ptr<rocksdb::ColumnFamilyHandle> _createCfh(rocksdb::DB* db ) { - - rocksdb::ColumnFamilyHandle* cfh; - - rocksdb::Status s = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), - columnFamilyName, - &cfh ); - - invariant( s.ok() ); - - return boost::shared_ptr<rocksdb::ColumnFamilyHandle>( cfh ); - } - - string _rocksRecordStoreTestDir = "mongo-rocks-test"; - - rocksdb::DB* getDB( string path) { - boost::filesystem::remove_all( path ); - - rocksdb::Options options = RocksEngine::dbOptions(); - - // open DB - rocksdb::DB* db; - rocksdb::Status s = rocksdb::DB::Open(options, path, &db); - ASSERT_OK( toMongoStatus( s ) ); - - return db; - } - - typedef std::pair<shared_ptr<rocksdb::DB>, shared_ptr<rocksdb::ColumnFamilyHandle> > DbAndCfh; - DbAndCfh getDBPersist( string path ) { - // Need to pass a vector with cfd's for every column family, which should just be - // columnFamilyName (for data) and the rocks default column family (for metadata). - vector<rocksdb::ColumnFamilyDescriptor> descriptors; - descriptors.push_back( rocksdb::ColumnFamilyDescriptor() ); - descriptors.push_back( rocksdb::ColumnFamilyDescriptor( columnFamilyName, - rocksdb::ColumnFamilyOptions() ) ); - - // open DB - rocksdb::DB* db; - rocksdb::Options options = RocksEngine::dbOptions(); - vector<rocksdb::ColumnFamilyHandle*> handles; - rocksdb::Status s = rocksdb::DB::Open(options, path, descriptors, &handles, &db); - ASSERT_OK( toMongoStatus( s ) ); - - // so that the caller of this function has access to the column family handle backing the - // record store data. - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfhPtr( handles[1] ); - - return std::make_pair( boost::shared_ptr<rocksdb::DB>( db ), cfhPtr ); - } - - TEST( RocksRecoveryUnitTest, Simple1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - db->Put( rocksdb::WriteOptions(), "a", "b" ); - - string value; - db->Get( rocksdb::ReadOptions(), "a", &value ); - ASSERT_EQUALS( value, "b" ); - - { - RocksRecoveryUnit ru( db.get(), false ); - ru.beginUnitOfWork(); - ru.writeBatch()->Put( "a", "c" ); - - value = "x"; - db->Get( rocksdb::ReadOptions(), "a", &value ); - ASSERT_EQUALS( value, "b" ); - - ru.endUnitOfWork(); - ru.commitUnitOfWork(); - value = "x"; - db->Get( rocksdb::ReadOptions(), "a", &value ); - ASSERT_EQUALS( value, "c" ); - } - - } - - TEST( RocksRecoveryUnitTest, SimpleAbort1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - db->Put( rocksdb::WriteOptions(), "a", "b" ); - - { - string value; - db->Get( rocksdb::ReadOptions(), "a", &value ); - ASSERT_EQUALS( value, "b" ); - } - - { - RocksRecoveryUnit ru( db.get(), false ); - ru.beginUnitOfWork(); - ru.writeBatch()->Put( "a", "c" ); - - // note: no endUnitOfWork or commitUnitOfWork - } - - { - string value; - db->Get( rocksdb::ReadOptions(), "a", &value ); - ASSERT_EQUALS( value, "b" ); - } - } - - - TEST( RocksRecordStoreTest, Insert1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() ); - int size; - - { - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - string s = "eliot was here"; - size = s.length() + 1; - - MyOperationContext opCtx( db.get() ); - DiskLoc loc; - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - } - - ASSERT_EQUALS(s, rs.dataFor(&opCtx, loc).data()); - } - - { - MyOperationContext opCtx(db.get()); - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - ASSERT_EQUALS(1, rs.numRecords(&opCtx)); - ASSERT_EQUALS(size, rs.dataSize(&opCtx)); - } - } - - TEST( RocksRecordStoreTest, Delete1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() ); - - { - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - string s = "eliot was here"; - - DiskLoc loc; - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord(&opCtx, - s.c_str(), - s.size() + 1, - -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - } - - ASSERT_EQUALS(s, rs.dataFor(&opCtx, loc).data()); - ASSERT_EQUALS(1, rs.numRecords(&opCtx)); - ASSERT_EQUALS(static_cast<long long>(s.length() + 1), rs.dataSize(&opCtx)); - } - - { - MyOperationContext opCtx(db.get()); - ASSERT(rs.dataFor(&opCtx, loc).data() != NULL); - } - - { - MyOperationContext opCtx( db.get() ); - WriteUnitOfWork uow( &opCtx ); - rs.deleteRecord( &opCtx, loc ); - - ASSERT_EQUALS(0, rs.numRecords(&opCtx)); - ASSERT_EQUALS(0, rs.dataSize(&opCtx)); - } - } - } - - TEST( RocksRecordStoreTest, Update1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() ); - - { - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - string s1 = "eliot1"; - string s2 = "eliot2 and more"; - - DiskLoc loc; - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, - s1.c_str(), - s1.size() + 1, - -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - } - - ASSERT_EQUALS(s1, rs.dataFor(&opCtx, loc).data()); - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = - rs.updateRecord(&opCtx, loc, s2.c_str(), s2.size() + 1, -1, NULL); - ASSERT_OK( res.getStatus() ); - ASSERT( loc == res.getValue() ); - } - - ASSERT_EQUALS(s2, rs.dataFor(&opCtx, loc).data()); - } - - } - } - - TEST( RocksRecordStoreTest, UpdateInPlace1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() ); - - { - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - string s1 = "aaa111bbb"; - string s2 = "aaa222bbb"; - - DiskLoc loc; - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, - s1.c_str(), - s1.size() + 1, - -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - } - - ASSERT_EQUALS(s1, rs.dataFor(&opCtx, loc).data()); - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - const char* damageSource = "222"; - mutablebson::DamageVector dv; - dv.push_back( mutablebson::DamageEvent() ); - dv[0].sourceOffset = 0; - dv[0].targetOffset = 3; - dv[0].size = 3; - Status res = rs.updateWithDamages( &opCtx, - loc, - damageSource, - dv ); - ASSERT_OK( res ); - } - ASSERT_EQUALS(s2, rs.dataFor(&opCtx, loc).data()); - } - - } - } - - TEST( RocksRecordStoreTest, TwoCollections ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - rocksdb::ColumnFamilyHandle* cf1; - rocksdb::ColumnFamilyHandle* cf2; - rocksdb::ColumnFamilyHandle* cf1_m; - rocksdb::ColumnFamilyHandle* cf2_m; - - rocksdb::Status status; - - status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1", &cf1 ); - ASSERT_OK( toMongoStatus( status ) ); - status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar2", &cf2 ); - ASSERT_OK( toMongoStatus( status ) ); - - status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1&", &cf1_m ); - ASSERT_OK( toMongoStatus( status ) ); - status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar2&", &cf2_m ); - ASSERT_OK( toMongoStatus( status ) ); - - RocksRecordStore rs1( "foo.bar1", db.get(), cf1, cf1_m ); - RocksRecordStore rs2( "foo.bar2", db.get(), cf2, cf2_m ); - - DiskLoc a; - DiskLoc b; - - { - MyOperationContext opCtx( db.get() ); - WriteUnitOfWork uow( &opCtx ); - - StatusWith<DiskLoc> result = rs1.insertRecord( &opCtx, "a", 2, -1 ); - ASSERT_OK( result.getStatus() ); - a = result.getValue(); - - result = rs2.insertRecord( &opCtx, "b", 2, -1 ); - ASSERT_OK( result.getStatus() ); - b = result.getValue(); - } - - ASSERT_EQUALS( a, b ); - - { - MyOperationContext opCtx(db.get()); - ASSERT_EQUALS(string("a"), rs1.dataFor(&opCtx, a).data()); - ASSERT_EQUALS(string("b"), rs2.dataFor(&opCtx, b).data()); - } - - delete cf2; - delete cf1; - } - - TEST( RocksRecordStoreTest, Stats1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() ); - - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - string s = "eliot was here"; - - { - MyOperationContext opCtx( db.get() ); - DiskLoc loc; - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - } - - ASSERT_EQUALS(s, rs.dataFor(&opCtx, loc).data()); - } - - { - MyOperationContext opCtx( db.get() ); - BSONObjBuilder b; - rs.appendCustomStats( &opCtx, &b, 1 ); - BSONObj obj = b.obj(); - ASSERT( obj["stats"].String().find( "WAL" ) != string::npos ); - } - } - - TEST( RocksRecordStoreTest, Persistence1 ) { - DiskLoc loc; - string origStr = "eliot was here"; - string newStr = "antonio was here"; - unittest::TempDir td( _rocksRecordStoreTestDir ); - - { - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() ); - - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, origStr.c_str(), - origStr.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - } - - ASSERT_EQUALS(origStr, rs.dataFor(&opCtx, loc).data()); - } - } - - { - DbAndCfh dbAndCfh = getDBPersist( td.path() ); - boost::shared_ptr<rocksdb::DB> db = dbAndCfh.first; - - RocksRecordStore rs( "foo.bar", - db.get(), - dbAndCfh.second.get(), - db->DefaultColumnFamily() ); - - { - MyOperationContext opCtx(db.get()); - ASSERT_EQUALS(static_cast<long long>(origStr.size() + 1), rs.dataSize(&opCtx)); - ASSERT_EQUALS(1, rs.numRecords(&opCtx)); - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = - rs.updateRecord(&opCtx, loc, newStr.c_str(), newStr.size() + 1, -1, NULL); - ASSERT_OK( res.getStatus() ); - } - - ASSERT_EQUALS(newStr, rs.dataFor(&opCtx, loc).data()); - } - } - - { - DbAndCfh dbAndCfh = getDBPersist( td.path() ); - boost::shared_ptr<rocksdb::DB> db = dbAndCfh.first; - - RocksRecordStore rs( "foo.bar", - db.get(), - dbAndCfh.second.get(), - db->DefaultColumnFamily() ); - - { - MyOperationContext opCtx(db.get()); - ASSERT_EQUALS(static_cast<long long>(newStr.size() + 1), rs.dataSize(&opCtx)); - ASSERT_EQUALS(1, rs.numRecords(&opCtx)); - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - rs.deleteRecord( &opCtx, loc ); - } - } - - { - MyOperationContext opCtx( db.get() ); - ASSERT_EQUALS(0, rs.dataSize(&opCtx)); - ASSERT_EQUALS(0, rs.numRecords(&opCtx)); - } - } - } - - TEST( RocksRecordStoreTest, ForwardIterator ) { - { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - rocksdb::ColumnFamilyHandle* cf1; - rocksdb::ColumnFamilyHandle* cf1_m; - - rocksdb::Status status; - - status = db->CreateColumnFamily( getColumnFamilyOptions(), "foo.bar1", &cf1 ); - ASSERT_OK( toMongoStatus( status ) ); - status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1&", &cf1_m ); - ASSERT_OK( toMongoStatus( status ) ); - - RocksRecordStore rs( "foo.bar", db.get(), cf1, cf1_m ); - string s1 = "eliot was here"; - string s2 = "antonio was here"; - string s3 = "eliot and antonio were here"; - DiskLoc loc1; - DiskLoc loc2; - DiskLoc loc3; - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s1.c_str(), s1.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc1 = res.getValue(); - res = rs.insertRecord( &opCtx, s2.c_str(), s2.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc2 = res.getValue(); - res = rs.insertRecord( &opCtx, s3.c_str(), s3.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc3 = res.getValue(); - } - } - - MyOperationContext txn(db.get()); - - scoped_ptr<RecordIterator> iter( rs.getIterator( &txn ) ); - - ASSERT_EQUALS( false, iter->isEOF() ); - ASSERT_EQUALS( loc1, iter->getNext() ); - ASSERT_EQUALS( s1, iter->dataFor( loc1 ).data() ); - - ASSERT_EQUALS( false, iter->isEOF() ); - ASSERT_EQUALS( loc2, iter->getNext() ); - ASSERT_EQUALS( s2, iter->dataFor( loc2 ).data() ); - - ASSERT_EQUALS( false, iter->isEOF() ); - ASSERT_EQUALS( loc3, iter->getNext() ); - ASSERT_EQUALS( s3, iter->dataFor( loc3 ).data() ); - - ASSERT_EQUALS( true, iter->isEOF() ); - ASSERT_EQUALS( DiskLoc(), iter->getNext() ); - } - } - - TEST( RocksRecordStoreTest, BackwardIterator ) { - { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - rocksdb::ColumnFamilyHandle* cf1; - rocksdb::ColumnFamilyHandle* cf1_m; - - rocksdb::Status status; - - status = db->CreateColumnFamily( getColumnFamilyOptions(), "foo.bar1", &cf1 ); - ASSERT_OK( toMongoStatus( status ) ); - status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1&", &cf1_m ); - ASSERT_OK( toMongoStatus( status ) ); - - RocksRecordStore rs( "foo.bar", db.get(), cf1, cf1_m ); - string s1 = "eliot was here"; - string s2 = "antonio was here"; - string s3 = "eliot and antonio were here"; - DiskLoc loc1; - DiskLoc loc2; - DiskLoc loc3; - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s1.c_str(), s1.size() +1, -1 ); - ASSERT_OK( res.getStatus() ); - loc1 = res.getValue(); - res = rs.insertRecord( &opCtx, s2.c_str(), s2.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc2 = res.getValue(); - res = rs.insertRecord( &opCtx, s3.c_str(), s3.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc3 = res.getValue(); - } - } - - MyOperationContext txn(db.get()); - scoped_ptr<RecordIterator> iter( rs.getIterator( &txn, maxDiskLoc, false, - CollectionScanParams::BACKWARD ) ); - ASSERT_EQUALS( false, iter->isEOF() ); - ASSERT_EQUALS( loc3, iter->getNext() ); - ASSERT_EQUALS( s3, iter->dataFor( loc3 ).data() ); - - ASSERT_EQUALS( false, iter->isEOF() ); - ASSERT_EQUALS( loc2, iter->getNext() ); - ASSERT_EQUALS( s2, iter->dataFor( loc2 ).data() ); - - ASSERT_EQUALS( false, iter->isEOF() ); - ASSERT_EQUALS( loc1, iter->getNext() ); - ASSERT_EQUALS( s1, iter->dataFor( loc1 ).data() ); - - ASSERT_EQUALS( true, iter->isEOF() ); - ASSERT_EQUALS( DiskLoc(), iter->getNext() ); - } - } - - TEST( RocksRecordStoreTest, Truncate1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - rocksdb::ColumnFamilyHandle* cf1; - rocksdb::ColumnFamilyHandle* cf1_m; - - rocksdb::Status status; - - status = db->CreateColumnFamily( getColumnFamilyOptions(), "foo.bar1", &cf1 ); - ASSERT_OK( toMongoStatus( status ) ); - status = db->CreateColumnFamily( rocksdb::ColumnFamilyOptions(), "foo.bar1&", &cf1_m ); - ASSERT_OK( toMongoStatus( status ) ); - - RocksRecordStore rs( "foo.bar", db.get(), cf1, cf1_m ); - string s = "antonio was here"; - - { - MyOperationContext opCtx( db.get() ); - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - } - - { - MyOperationContext opCtx( db.get() ); - WriteUnitOfWork uow( &opCtx ); - Status stat = rs.truncate( &opCtx ); - ASSERT_OK( stat ); - - ASSERT_EQUALS(0, rs.numRecords(&opCtx)); - ASSERT_EQUALS(0, rs.dataSize(&opCtx)); - } - - // Test that truncate does not fail on an empty collection - { - MyOperationContext opCtx( db.get() ); - WriteUnitOfWork uow( &opCtx ); - Status stat = rs.truncate( &opCtx ); - ASSERT_OK( stat ); - - ASSERT_EQUALS(0, rs.numRecords(&opCtx)); - ASSERT_EQUALS(0, rs.dataSize(&opCtx)); - } - } - } - - TEST( RocksRecordStoreTest, Snapshots1 ) { - unittest::TempDir td( _rocksRecordStoreTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = _createCfh( db.get() ); - - DiskLoc loc; - int size = -1; - - { - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - string s = "test string"; - size = s.length() + 1; - - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - StatusWith<DiskLoc> res = rs.insertRecord( &opCtx, s.c_str(), s.size() + 1, -1 ); - ASSERT_OK( res.getStatus() ); - loc = res.getValue(); - } - } - - { - MyOperationContext opCtx( db.get() ); - MyOperationContext opCtx2( db.get() ); - - RocksRecordStore rs( "foo.bar", db.get(), cfh.get(), db->DefaultColumnFamily() ); - - rs.deleteRecord( &opCtx, loc ); - - RecordData recData = rs.dataFor(&opCtx, loc /*, &opCtx */); - ASSERT( !recData.data() && recData.size() == 0 ); - - RecordData recData2 = rs.dataFor(&opCtx2, loc); - ASSERT(recData2.data() && recData2.size() == size); - } - } + HarnessHelper* newHarnessHelper() { return new RocksRecordStoreHarnessHelper(); } } diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp index 46bf25d7d89..26502935c93 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp @@ -32,11 +32,13 @@ #include <rocksdb/comparator.h> #include <rocksdb/db.h> +#include <rocksdb/iterator.h> #include <rocksdb/slice.h> #include <rocksdb/options.h> #include <rocksdb/write_batch.h> #include <rocksdb/utilities/write_batch_with_index.h> +#include "mongo/db/operation_context.h" #include "mongo/util/log.h" namespace mongo { @@ -50,7 +52,7 @@ namespace mongo { RocksRecoveryUnit::~RocksRecoveryUnit() { if (!_destroyed) { - _destroyInternal(); + destroy(); } } @@ -105,7 +107,8 @@ namespace mongo { if (!_writeBatch) { // this assumes that default column family uses default comparator. change this if you // change default column family's comparator - _writeBatch.reset(new rocksdb::WriteBatchWithIndex(rocksdb::BytewiseComparator())); + _writeBatch.reset( + new rocksdb::WriteBatchWithIndex(rocksdb::BytewiseComparator(), 0, true)); } return _writeBatch.get(); @@ -114,8 +117,19 @@ namespace mongo { void RocksRecoveryUnit::registerChange(Change* change) { _changes.emplace_back(change); } void RocksRecoveryUnit::destroy() { + if (_defaultCommit) { + commitUnitOfWork(); + } + + if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) { + for (auto& change : _changes) { + change->rollback(); + delete change; + } + } + + releaseSnapshot(); _destroyed = true; - _destroyInternal(); } // XXX lazily initialized for now @@ -131,18 +145,46 @@ namespace mongo { return _snapshot; } - void RocksRecoveryUnit::_destroyInternal() { - if (_defaultCommit) { - commitUnitOfWork(); + void RocksRecoveryUnit::releaseSnapshot() { + if (_snapshot) { + _db->ReleaseSnapshot(_snapshot); + _snapshot = nullptr; } + } - for (auto& change : _changes) { - change->rollback(); - delete change; + rocksdb::Status RocksRecoveryUnit::Get(rocksdb::ColumnFamilyHandle* columnFamily, + const rocksdb::Slice& key, std::string* value) { + if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) { + boost::scoped_ptr<rocksdb::WBWIIterator> wb_iterator( + _writeBatch->NewIterator(columnFamily)); + wb_iterator->Seek(key); + if (wb_iterator->Valid() && wb_iterator->Entry().key == key) { + const auto& entry = wb_iterator->Entry(); + if (entry.type == rocksdb::WriteType::kDeleteRecord) { + return rocksdb::Status::NotFound(); + } + // TODO avoid double copy + *value = std::string(entry.value.data(), entry.value.size()); + return rocksdb::Status::OK(); + } } + rocksdb::ReadOptions options; + options.snapshot = snapshot(); + return _db->Get(options, columnFamily, key, value); + } - if (_snapshot) { - _db->ReleaseSnapshot(_snapshot); + rocksdb::Iterator* RocksRecoveryUnit::NewIterator(rocksdb::ColumnFamilyHandle* columnFamily) { + rocksdb::ReadOptions options; + options.snapshot = snapshot(); + auto iterator = _db->NewIterator(options, columnFamily); + if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) { + iterator = _writeBatch->NewIteratorWithBase(columnFamily, iterator); } + return iterator; + } + + RocksRecoveryUnit* RocksRecoveryUnit::getRocksRecoveryUnit(OperationContext* opCtx) { + return dynamic_cast<RocksRecoveryUnit*>(opCtx->recoveryUnit()); } + } diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h index fd6499a5a1a..6a87b3417b6 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h @@ -46,14 +46,20 @@ namespace rocksdb { class Snapshot; class WriteBatchWithIndex; class Comparator; + struct Status; + class ColumnFamilyHandle; + class Slice; + class Iterator; } namespace mongo { + class OperationContext; + class RocksRecoveryUnit : public RecoveryUnit { MONGO_DISALLOW_COPYING(RocksRecoveryUnit); public: - RocksRecoveryUnit(rocksdb::DB* db, bool defaultCommit); + RocksRecoveryUnit(rocksdb::DB* db, bool defaultCommit = false); virtual ~RocksRecoveryUnit(); virtual void beginUnitOfWork(); @@ -79,6 +85,16 @@ namespace mongo { const rocksdb::Snapshot* snapshot(); + // to support tailable cursors + void releaseSnapshot(); + + rocksdb::Status Get(rocksdb::ColumnFamilyHandle* columnFamily, const rocksdb::Slice& key, + std::string* value); + + rocksdb::Iterator* NewIterator(rocksdb::ColumnFamilyHandle* columnFamily); + + static RocksRecoveryUnit* getRocksRecoveryUnit(OperationContext* opCtx); + private: void _destroyInternal(); diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp index 133e4a0230a..6e4c6ee21b3 100644 --- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp +++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp @@ -93,9 +93,6 @@ namespace mongo { return IndexKeyEntry( key, loc ); } - /** - * Creates an error code message out of a key - */ string dupKeyError(const BSONObj& key) { stringstream ss; ss << "E11000 duplicate key error "; @@ -109,11 +106,17 @@ namespace mongo { */ class RocksCursor : public SortedDataInterface::Cursor { public: - RocksCursor( rocksdb::Iterator* iterator, bool forward, Ordering o ) - : _iterator( iterator ), _forward( forward ), _isCached( false ), _comparator( o ) { + RocksCursor(OperationContext* txn, rocksdb::DB* db, + boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, bool forward, Ordering o) + : _db(db), + _columnFamily(columnFamily), + _forward(forward), + _isCached(false), + _comparator(o) { + _resetIterator(txn); // TODO: maybe don't seek until we know we need to? - if ( _forward ) + if (_forward) _iterator->SeekToFirst(); else _iterator->SeekToLast(); @@ -147,7 +150,11 @@ namespace mongo { } bool locate(const BSONObj& key, const DiskLoc& loc) { - return _locate( stripFieldNames( key ), loc ); + if (_forward) { + return _locate(stripFieldNames(key), loc); + } else { + return _reverseLocate(stripFieldNames(key), loc); + } } // same first five args as IndexEntryComparison::makeQueryObject (which is commented). @@ -165,7 +172,11 @@ namespace mongo { keyEndInclusive, getDirection() ); - _locate( key, DiskLoc() ); + if (_forward) { + _locate(key, minDiskLoc); + } else { + _reverseLocate(key, maxDiskLoc); + } } /** @@ -213,6 +224,7 @@ namespace mongo { } void restorePosition(OperationContext* txn) { + _resetIterator(txn); _isCached = false; if ( _savedAtEnd ) { @@ -236,6 +248,12 @@ namespace mongo { } private: + void _resetIterator(OperationContext* txn) { + invariant(txn); + invariant(_db); + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + _iterator.reset(ru->NewIterator(_columnFamily.get())); + } // _locate() for reverse iterators bool _reverseLocate( const BSONObj& key, const DiskLoc loc ) { @@ -276,9 +294,7 @@ namespace mongo { * performing the actual locate logic. */ bool _locate( const BSONObj& key, const DiskLoc loc ) { - if ( !_forward ) { - return _reverseLocate( key, loc ); - } + invariant(_forward); _isCached = false; // assumes fieldNames already stripped if necessary @@ -317,6 +333,8 @@ namespace mongo { _cachedKey.objsize() ); } + rocksdb::DB* _db; // not owned + boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily; scoped_ptr<rocksdb::Iterator> _iterator; const bool _forward; @@ -342,7 +360,10 @@ namespace mongo { public: RocksIndexEntryComparator( const Ordering& order ): _indexComparator( order ) { } - virtual int Compare( const rocksdb::Slice& a, const rocksdb::Slice& b ) const { + virtual int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b) const { + if (a.size() == 0 || b.size() == 0) { + return a.size() == b.size() ? 0 : ((a.size() == 0) ? -1 : 1); + } const IndexKeyEntry lhs = makeIndexKeyEntry( a ); const IndexKeyEntry rhs = makeIndexKeyEntry( b ); return _indexComparator.compare( lhs, rhs ); @@ -366,7 +387,7 @@ namespace mongo { class WriteBufferCopyIntoHandler : public rocksdb::WriteBatch::Handler { public: - WriteBufferCopyIntoHandler(rocksdb::WriteBatchWithIndex* outWriteBatch, + WriteBufferCopyIntoHandler(rocksdb::WriteBatch* outWriteBatch, rocksdb::ColumnFamilyHandle* columnFamily) : _OutWriteBatch(outWriteBatch), _columnFamily(columnFamily) { } @@ -379,82 +400,30 @@ namespace mongo { } private: - rocksdb::WriteBatchWithIndex* _OutWriteBatch; + rocksdb::WriteBatch* _OutWriteBatch; rocksdb::ColumnFamilyHandle* _columnFamily; }; class RocksBulkSortedBuilderImpl : public SortedDataBuilderInterface { public: - RocksBulkSortedBuilderImpl(RocksSortedDataImpl* data, - rocksdb::ColumnFamilyHandle* columnFamily, - const Ordering& order, - OperationContext* txn, + RocksBulkSortedBuilderImpl(RocksSortedDataImpl* index, OperationContext* txn, bool dupsAllowed) - : _comparator(order), - _writeBatch(&_comparator), - _data(data), - _columnFamily(columnFamily), - _txn(txn), - _dupsAllowed(dupsAllowed) { } + : _index(index), _txn(txn), _dupsAllowed(dupsAllowed) { + invariant(index->isEmpty(txn)); + } Status addKey(const BSONObj& key, const DiskLoc& loc) { - // inserts should be in ascending order. - - if (key.objsize() >= kTempKeyMaxSize) { - return Status(ErrorCodes::KeyTooLong, "key too big"); - } - - invariant(!loc.isNull()); - invariant(loc.isValid()); - - // TODO: How to check "invariant(!hasFieldNames(key));" ? - if (!_dupsAllowed) { - // TODO need key locking to support unique indexes. - Status status = _data->dupKeyCheck(_txn, key, loc); - if (!status.isOK()) { - return status; - } - - // Check uniqueness with previous insert to the same bulk - boost::scoped_ptr<rocksdb::WBWIIterator> wbwi( - _writeBatch.NewIterator(_columnFamily)); - wbwi->Seek(makeString(key, DiskLoc())); - invariant(wbwi->status().ok()); - bool isDup = false; - // TODO can I assume loc never decreases? - while (wbwi->Valid()) { - IndexKeyEntry ike = makeIndexKeyEntry(wbwi->Entry().key); - if (ike.key == key) { - if (ike.loc != loc) { - isDup = true; - } - wbwi->Next(); - invariant(wbwi->status().ok()); - } - else - break; - } - - if (isDup) - return Status(ErrorCodes::DuplicateKey, dupKeyError(key)); - } - - _writeBatch.Put(_columnFamily, makeString(key, loc), rocksdb::Slice()); - - return Status::OK(); + // TODO maybe optimize based on a fact that index is empty? + return _index->insert(_txn, key, loc, _dupsAllowed); } void commit(bool mayInterrupt) { - RocksRecoveryUnit* ru = dynamic_cast<RocksRecoveryUnit*>(_txn->recoveryUnit()); - WriteBufferCopyIntoHandler copy_handler(ru->writeBatch(), _columnFamily); - _writeBatch.GetWriteBatch()->Iterate(©_handler); + WriteUnitOfWork uow(_txn); + uow.commit(); } private: - RocksIndexEntryComparator _comparator; - rocksdb::WriteBatchWithIndex _writeBatch; - RocksSortedDataImpl* _data; - rocksdb::ColumnFamilyHandle* _columnFamily; + RocksSortedDataImpl* _index; OperationContext* _txn; bool _dupsAllowed; }; @@ -463,20 +432,21 @@ namespace mongo { // RocksSortedDataImpl*********** - RocksSortedDataImpl::RocksSortedDataImpl( rocksdb::DB* db, - rocksdb::ColumnFamilyHandle* cf, - Ordering order ) : _db( db ), _columnFamily( cf ), _order( order ) { + RocksSortedDataImpl::RocksSortedDataImpl(rocksdb::DB* db, + boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf, + Ordering order) + : _db(db), _columnFamily(cf), _order(order) { invariant( _db ); - invariant( _columnFamily ); + invariant(_columnFamily.get()); string value; - bool ok = _db->GetProperty(_columnFamily, "rocksdb.estimate-num-keys", &value); + bool ok = _db->GetProperty(_columnFamily.get(), "rocksdb.estimate-num-keys", &value); invariant( ok ); _numEntries.store(std::atoll(value.c_str())); } SortedDataBuilderInterface* RocksSortedDataImpl::getBulkBuilder(OperationContext* txn, bool dupsAllowed) { - return new RocksBulkSortedBuilderImpl(this, _columnFamily, _order, txn, dupsAllowed); + return new RocksBulkSortedBuilderImpl(this, txn, dupsAllowed); } Status RocksSortedDataImpl::insert(OperationContext* txn, @@ -486,12 +456,12 @@ namespace mongo { if (key.objsize() >= kTempKeyMaxSize) { string msg = mongoutils::str::stream() - << "Heap1Btree::insert: key too large to index, failing " - << ' ' << key.objsize() << ' ' << key; + << "RocksSortedDataImpl::insert: key too large to index, failing " << ' ' + << key.objsize() << ' ' << key; return Status(ErrorCodes::KeyTooLong, msg); } - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); if ( !dupsAllowed ) { // TODO need key locking to support unique indexes. @@ -502,7 +472,7 @@ namespace mongo { } ru->registerChange(new ChangeNumEntries(&_numEntries, true)); - ru->writeBatch()->Put( _columnFamily, makeString( key, loc ), emptyByteSlice ); + ru->writeBatch()->Put(_columnFamily.get(), makeString(key, loc), emptyByteSlice); return Status::OK(); } @@ -510,18 +480,17 @@ namespace mongo { bool RocksSortedDataImpl::unindex(OperationContext* txn, const BSONObj& key, const DiskLoc& loc) { - RocksRecoveryUnit* ru = _getRecoveryUnit( txn ); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); const string keyData = makeString( key, loc ); string dummy; - const rocksdb::ReadOptions options = RocksEngine::readOptionsWithSnapshot( txn ); - if ( _db->Get( options,_columnFamily, keyData, &dummy ).IsNotFound() ) { + if (ru->Get(_columnFamily.get(), keyData, &dummy).IsNotFound()) { return false; } ru->registerChange(new ChangeNumEntries(&_numEntries, false)); - ru->writeBatch()->Delete( _columnFamily, keyData ); + ru->writeBatch()->Delete(_columnFamily.get(), keyData); return true; } @@ -531,7 +500,7 @@ namespace mongo { boost::scoped_ptr<SortedDataInterface::Cursor> cursor(newCursor(txn, 1)); cursor->locate(key, DiskLoc(0, 0)); - if (cursor->isEOF() || cursor->getKey() != key) { + if (cursor->isEOF() || cursor->getKey() != key || cursor->getDiskLoc() == loc) { return Status::OK(); } else { return Status(ErrorCodes::DuplicateKey, dupKeyError(key)); @@ -541,8 +510,8 @@ namespace mongo { void RocksSortedDataImpl::fullValidate(OperationContext* txn, long long* numKeysOut) const { if (numKeysOut) { *numKeysOut = 0; - const rocksdb::ReadOptions options = RocksEngine::readOptionsWithSnapshot(txn); - boost::scoped_ptr<rocksdb::Iterator> it(_db->NewIterator(options, _columnFamily)); + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + boost::scoped_ptr<rocksdb::Iterator> it(ru->NewIterator(_columnFamily.get())); it->SeekToFirst(); for (*numKeysOut = 0; it->Valid(); it->Next()) { ++(*numKeysOut); @@ -550,17 +519,23 @@ namespace mongo { } } - bool RocksSortedDataImpl::isEmpty( OperationContext* txn ) { - // XXX doesn't use snapshot - boost::scoped_ptr<rocksdb::Iterator> it( _db->NewIterator( rocksdb::ReadOptions(), - _columnFamily ) ); + bool RocksSortedDataImpl::isEmpty(OperationContext* txn) { + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + boost::scoped_ptr<rocksdb::Iterator> it(ru->NewIterator(_columnFamily.get())); it->SeekToFirst(); return !it->Valid(); } Status RocksSortedDataImpl::touch(OperationContext* txn) const { - // no-op + boost::scoped_ptr<rocksdb::Iterator> itr; + // no need to use snapshot to load into memory + itr.reset(_db->NewIterator(rocksdb::ReadOptions(), _columnFamily.get())); + itr->SeekToFirst(); + for (; itr->Valid(); itr->Next()) { + invariant(itr->status().ok()); + } + return Status::OK(); } @@ -570,8 +545,7 @@ namespace mongo { int direction) const { invariant( ( direction == 1 || direction == -1 ) && "invalid value for direction" ); rocksdb::ReadOptions options = RocksEngine::readOptionsWithSnapshot( txn ); - return new RocksCursor( - _db->NewIterator( options, _columnFamily ), direction == 1, _order ); + return new RocksCursor(txn, _db, _columnFamily, direction == 1, _order); } Status RocksSortedDataImpl::initAsEmpty(OperationContext* txn) { @@ -580,8 +554,9 @@ namespace mongo { } long long RocksSortedDataImpl::getSpaceUsedBytes( OperationContext* txn ) const { - boost::scoped_ptr<rocksdb::Iterator> iter( _db->NewIterator( rocksdb::ReadOptions(), - _columnFamily ) ); + // no need to use snapshot here + boost::scoped_ptr<rocksdb::Iterator> iter( + _db->NewIterator(rocksdb::ReadOptions(), _columnFamily.get())); iter->SeekToFirst(); const rocksdb::Slice rangeStart = iter->key(); iter->SeekToLast(); @@ -594,7 +569,7 @@ namespace mongo { // TODO Rocks specifies that this may not include recently written data. Figure out if // that's okay - _db->GetApproximateSizes( _columnFamily, &fullIndexRange, 1, &spacedUsedBytes ); + _db->GetApproximateSizes(_columnFamily.get(), &fullIndexRange, 1, &spacedUsedBytes); return spacedUsedBytes; } @@ -604,8 +579,4 @@ namespace mongo { return new RocksIndexEntryComparator( order ); } - RocksRecoveryUnit* RocksSortedDataImpl::_getRecoveryUnit( OperationContext* opCtx ) const { - return dynamic_cast<RocksRecoveryUnit*>( opCtx->recoveryUnit() ); - } - } diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h index 2e14b3c62e7..405531e04a2 100644 --- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h +++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h @@ -64,7 +64,8 @@ namespace mongo { class RocksSortedDataImpl : public SortedDataInterface { MONGO_DISALLOW_COPYING( RocksSortedDataImpl ); public: - RocksSortedDataImpl( rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, Ordering order ); + RocksSortedDataImpl(rocksdb::DB* db, boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf, + Ordering order); virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed); @@ -100,13 +101,11 @@ namespace mongo { private: typedef DiskLoc RecordId; - RocksRecoveryUnit* _getRecoveryUnit( OperationContext* opCtx ) const; - rocksdb::DB* _db; // not owned // Each index is stored as a single column family, so this stores the handle to the // relevant column family - rocksdb::ColumnFamilyHandle* _columnFamily; // not owned + boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily; // used to construct RocksCursors const Ordering _order; diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_harness_test.cpp b/src/mongo/db/storage/rocks/rocks_sorted_data_impl_harness_test.cpp deleted file mode 100644 index ef148fa8ab6..00000000000 --- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_harness_test.cpp +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright (C) 2014 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include <boost/shared_ptr.hpp> -#include <boost/filesystem/operations.hpp> - -#include <rocksdb/comparator.h> -#include <rocksdb/db.h> -#include <rocksdb/options.h> -#include <rocksdb/slice.h> - -#include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/sorted_data_interface_test_harness.h" -#include "mongo/unittest/temp_dir.h" -#include "mongo/unittest/unittest.h" -#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" - -namespace mongo { - - class RocksHarnessHelper : public HarnessHelper { - public: - RocksHarnessHelper() : _order(Ordering::make(BSONObj())), _tempDir(_testNamespace) { - boost::filesystem::remove_all(_tempDir.path()); - rocksdb::DB* db; - rocksdb::Status s = rocksdb::DB::Open(RocksEngine::dbOptions(), _tempDir.path(), &db); - ASSERT(s.ok()); - _db.reset(db); - } - - virtual SortedDataInterface* newSortedDataInterface() { - return new RocksSortedDataImpl(_db.get(), _db->DefaultColumnFamily(), _order); - } - - virtual RecoveryUnit* newRecoveryUnit() { return new RocksRecoveryUnit(_db.get(), false); } - - private: - Ordering _order; - string _testNamespace = "mongo-rocks-sorted-data-test"; - unittest::TempDir _tempDir; - scoped_ptr<rocksdb::DB> _db; - }; - - HarnessHelper* newHarnessHelper() { return new RocksHarnessHelper(); } -} diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp b/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp index 78628a668cc..f629ef5925c 100644 --- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp @@ -1,35 +1,30 @@ -// rocks_sorted_data_impl_test.cpp - /** -* Copyright (C) 2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ - -#include <memory> + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #include <boost/shared_ptr.hpp> #include <boost/filesystem/operations.hpp> @@ -39,889 +34,45 @@ #include <rocksdb/options.h> #include <rocksdb/slice.h> -#include "mongo/db/operation_context_noop.h" #include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h" -#include "mongo/db/storage/rocks/rocks_record_store.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" +#include "mongo/db/storage/sorted_data_interface_test_harness.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" - -using namespace mongo; +#include "mongo/db/storage/rocks/rocks_sorted_data_impl.h" +#include "mongo/db/storage/rocks/rocks_recovery_unit.h" namespace mongo { - class MyOperationContext : public OperationContextNoop { + class RocksSortedDataImplHarness : public HarnessHelper { public: - MyOperationContext( rocksdb::DB* db ) - : OperationContextNoop( new RocksRecoveryUnit( db, false ) ) { - } + RocksSortedDataImplHarness() : _order(Ordering::make(BSONObj())), _tempDir(_testNamespace) { + boost::filesystem::remove_all(_tempDir.path()); + rocksdb::DB* db; + std::vector<rocksdb::ColumnFamilyDescriptor> cfs; + cfs.emplace_back(); + cfs[0].options.comparator = RocksSortedDataImpl::newRocksComparator(_order); + rocksdb::DBOptions db_options; + db_options.create_if_missing = true; + std::vector<rocksdb::ColumnFamilyHandle*> handles; + auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db); + ASSERT(s.ok()); + _db.reset(db); + _cf.reset(handles[0]); + } + + virtual SortedDataInterface* newSortedDataInterface() { + return new RocksSortedDataImpl(_db.get(), _cf, _order); + } + + virtual RecoveryUnit* newRecoveryUnit() { return new RocksRecoveryUnit(_db.get()); } + + private: + Ordering _order; + string _testNamespace = "mongo-rocks-sorted-data-test"; + unittest::TempDir _tempDir; + scoped_ptr<rocksdb::DB> _db; + shared_ptr<rocksdb::ColumnFamilyHandle> _cf; }; - // to be used in testing - static std::unique_ptr<rocksdb::Comparator> _rocksComparator( - RocksSortedDataImpl::newRocksComparator( Ordering::make( BSON( "a" << 1 ) ) ) ); - - string _rocksSortedDataTestDir = "mongo-rocks-test"; - - rocksdb::DB* getDB( string path ) { - boost::filesystem::remove_all( path ); - - rocksdb::Options options = RocksEngine::dbOptions(); - - // open DB - rocksdb::DB* db; - rocksdb::Status s = rocksdb::DB::Open(options, path, &db); - ASSERT(s.ok()); - - return db; - } - - const Ordering dummyOrdering = Ordering::make( BSONObj() ); - - TEST( RocksSortedDataTest, BrainDead ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - BSONObj key = BSON( "" << 1 ); - DiskLoc loc( 5, 16 ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT( !sortedData.unindex( &opCtx, key, loc ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - Status res = sortedData.insert( &opCtx, key, loc, true ); - ASSERT_OK( res ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT( sortedData.unindex( &opCtx, key, loc ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - sortedData.unindex( &opCtx, key, loc ); - uow.commit(); - } - } - - } - } - - TEST( RocksSortedDataTest, Locate1 ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - BSONObj key = BSON( "" << 1 ); - DiskLoc loc( 5, 16 ); - - { - - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT( !cursor->locate( key, loc ) ); - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - Status res = sortedData.insert( &opCtx, key, loc, true ); - ASSERT_OK( res ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT( cursor->locate( key, loc ) ); - ASSERT_EQUALS( key, cursor->getKey() ); - ASSERT_EQUALS( loc, cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, Locate2 ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT( !cursor->locate( BSON( "a" << 2 ), DiskLoc(0,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - cursor->advance(); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - - cursor->advance(); - ASSERT( cursor->isEOF() ); - } - } - } - - boost::shared_ptr<rocksdb::ColumnFamilyHandle> makeColumnFamily( rocksdb::DB* db ) { - rocksdb::ColumnFamilyOptions options; - options.comparator = _rocksComparator.get(); - - rocksdb::ColumnFamilyHandle* cfh; - rocksdb::Status s = db->CreateColumnFamily( options, "simpleColumnFamily", &cfh ); - ASSERT( s.ok() ); - - return boost::shared_ptr<rocksdb::ColumnFamilyHandle>( cfh ); - } - - TEST( RocksSortedDataTest, LocateInexact ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = makeColumnFamily( db.get() ); - - RocksSortedDataImpl sortedData( db.get(), cfh.get(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT_FALSE( cursor->locate( BSON( "a" << 2 ), DiskLoc(0,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, Snapshots ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - - // get a cursor - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - - // insert some more stuff - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - cursor->advance(); - - // make sure that the cursor can't "see" anything added after it was created. - ASSERT( cursor-> isEOF() ); - ASSERT_FALSE( cursor->locate( BSON( "" << 3 ), DiskLoc(1,3) ) ); - ASSERT( cursor->isEOF() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionSimple ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT( !cursor->locate( BSON( "a" << 1 ), DiskLoc(0,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - // restore position - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - - // repeat, with a different value - ASSERT( !cursor->locate( BSON( "a" << 2 ), DiskLoc(0,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - // restore position - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionEOF ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT( !cursor->locate( BSON( "a" << 1 ), DiskLoc(0,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - - // advance to the end - while ( !cursor->isEOF() ) { - cursor->advance(); - } - - ASSERT( cursor->isEOF() ); - - // save the position - cursor->savePosition(); - - // restore position, make sure we're at the end - cursor->restorePosition( &opCtx ); - ASSERT( cursor->isEOF() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionInsert ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT( !cursor->locate( BSON( "" << 3 ), DiskLoc(0,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT_OK( - sortedData.insert( &opCtx, BSON( "" << 4 ), DiskLoc(1,4), true ) ); - uow.commit(); - } - } - - // restore position, make sure we don't see the newly inserted value - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - - cursor->advance(); - ASSERT( cursor->isEOF() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionDelete2 ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT( !cursor->locate( BSON( "" << 2 ), DiskLoc(0,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT( sortedData.unindex( &opCtx, BSON( "" << 1 ), DiskLoc(1,1) ) ); - uow.commit(); - } - } - - // restore position - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionDelete3 ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, 1 ) ); - ASSERT( !cursor->locate( BSON( "" << 2 ), DiskLoc(0,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT( sortedData.unindex( &opCtx, BSON( "" << 3 ), DiskLoc(1,3) ) ); - uow.commit(); - } - } - - // restore position - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - // make sure that we can still see the unindexed data, since we're working on - // a snapshot - cursor->advance(); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - - cursor->advance(); - ASSERT( cursor->isEOF() ); - } - } - } - - TEST( RocksSortedDataTest, Locate1Reverse ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - BSONObj key = BSON( "" << 1 ); - DiskLoc loc( 5, 16 ); - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) ); - ASSERT( !cursor->locate( key, loc ) ); - } - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - Status res = sortedData.insert( &opCtx, key, loc, true ); - ASSERT_OK( res ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) ); - ASSERT( cursor->locate( key, loc ) ); - ASSERT_EQUALS( key, cursor->getKey() ); - ASSERT_EQUALS( loc, cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, LocateInexactReverse ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cfh = makeColumnFamily( db.get() ); - - RocksSortedDataImpl sortedData( db.get(), cfh.get(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "a" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "a" << 3 ), DiskLoc(1,1), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) ); - ASSERT_FALSE( cursor->locate( BSON( "a" << 2 ), DiskLoc(1,1) ) ); - ASSERT_FALSE( cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionReverseSimple ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) ); - ASSERT( !cursor->locate( BSON( "a" << 1 ), DiskLoc(2,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - // restore position - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - - // repeat, with a different value - ASSERT( !cursor->locate( BSON( "a" << 2 ), DiskLoc(2,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - // restore position - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionEOFReverse ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 4 ), DiskLoc(1,4), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, -1 ) ); - ASSERT_FALSE( cursor->locate( BSON( "" << 2 ), DiskLoc(1,2) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - - // advance to the end - while ( !cursor->isEOF() ) { - cursor->advance(); - } - - ASSERT( cursor->isEOF() ); - - // save the position - cursor->savePosition(); - - // restore position, make sure we're at the end - cursor->restorePosition( &opCtx ); - ASSERT( cursor->isEOF() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionInsertReverse ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, - -1 ) ); - ASSERT( !cursor->locate( BSON( "" << 3 ), DiskLoc(2,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT_OK( - sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - uow.commit(); - } - } - - // restore position, make sure we don't see the newly inserted value - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - - cursor->advance(); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - - cursor->advance(); - ASSERT( cursor->isEOF() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionDelete1Reverse ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, - -1 ) ); - ASSERT( !cursor->locate( BSON( "" << 3 ), DiskLoc(2,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT( sortedData.unindex( &opCtx, BSON( "" << 3 ), DiskLoc(1,3) ) ); - uow.commit(); - } - } - - // restore position, make sure we still see the deleted key and value, because - // we're using a snapshot - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 3 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,3), cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionDelete2Reverse ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, - -1 ) ); - ASSERT( !cursor->locate( BSON( "" << 2 ), DiskLoc(2,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT( sortedData.unindex( &opCtx, BSON( "" << 1 ), DiskLoc(1,1) ) ); - uow.commit(); - } - } - - // restore position - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - } - } - } - - TEST( RocksSortedDataTest, SaveAndRestorePositionDelete3Reverse ) { - unittest::TempDir td( _rocksSortedDataTestDir ); - scoped_ptr<rocksdb::DB> db( getDB( td.path() ) ); - - { - RocksSortedDataImpl sortedData( db.get(), db->DefaultColumnFamily(), dummyOrdering ); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 1 ), DiskLoc(1,1), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 2 ), DiskLoc(1,2), true ) ); - ASSERT_OK( sortedData.insert( &opCtx, BSON( "" << 3 ), DiskLoc(1,3), true ) ); - uow.commit(); - } - } - - { - MyOperationContext opCtx( db.get() ); - scoped_ptr<SortedDataInterface::Cursor> cursor( sortedData.newCursor( &opCtx, - -1 ) ); - ASSERT( !cursor->locate( BSON( "" << 2 ), DiskLoc(2,0) ) ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - // save the position - cursor->savePosition(); - - { - MyOperationContext opCtx( db.get() ); - { - WriteUnitOfWork uow( &opCtx ); - ASSERT( sortedData.unindex( &opCtx, BSON( "" << 1 ), DiskLoc(1,1) ) ); - uow.commit(); - } - } - - // restore position - cursor->restorePosition( &opCtx ); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 2 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,2), cursor->getDiskLoc() ); - - // make sure that we can still see the unindexed data, since we're working on - // a snapshot - cursor->advance(); - ASSERT( !cursor->isEOF() ); - ASSERT_EQUALS( BSON( "" << 1 ), cursor->getKey() ); - ASSERT_EQUALS( DiskLoc(1,1), cursor->getDiskLoc() ); - - cursor->advance(); - ASSERT( cursor->isEOF() ); - } - } - } + HarnessHelper* newHarnessHelper() { return new RocksSortedDataImplHarness(); } } |