diff options
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_engine.cpp | 315 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_engine.h | 37 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_index.cpp | 126 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_index.h | 18 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_index_test.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_init.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_record_store.cpp | 187 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_record_store.h | 16 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_record_store_test.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_recovery_unit.cpp | 109 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_recovery_unit.h | 10 |
11 files changed, 379 insertions, 481 deletions
diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp index 604798cb3ea..749895a6edc 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.cpp +++ b/src/mongo/db/storage/rocks/rocks_engine.cpp @@ -36,6 +36,7 @@ #include <boost/filesystem/operations.hpp> #include <boost/make_shared.hpp> #include <boost/shared_ptr.hpp> +#include <boost/scoped_ptr.hpp> #include <rocksdb/cache.h> #include <rocksdb/comparator.h> @@ -52,6 +53,7 @@ #include "mongo/db/storage/rocks/rocks_record_store.h" #include "mongo/db/storage/rocks/rocks_recovery_unit.h" #include "mongo/db/storage/rocks/rocks_index.h" +#include "mongo/platform/endian.h" #include "mongo/util/log.h" #include "mongo/util/processinfo.h" @@ -62,16 +64,29 @@ namespace mongo { - using boost::shared_ptr; + namespace { + // we encode prefixes in big endian because we want to quickly jump to the max prefix + // (iter->SeekToLast()) + bool extractPrefix(const rocksdb::Slice& slice, uint32_t* prefix) { + if (slice.size() < sizeof(uint32_t)) { + return false; + } + *prefix = endian::bigToNative(*reinterpret_cast<const uint32_t*>(slice.data())); + return true; + } - const std::string RocksEngine::kOrderingPrefix("indexordering-"); - const std::string RocksEngine::kCollectionPrefix("collection-"); + std::string encodePrefix(uint32_t prefix) { + uint32_t bigEndianPrefix = endian::nativeToBig(prefix); + return std::string(reinterpret_cast<const char*>(&bigEndianPrefix), sizeof(uint32_t)); + } + } // anonymous namespace - RocksEngine::RocksEngine(const std::string& path, bool durable) - : _path(path), - _durable(durable) { + // first four bytes are the default prefix 0 + const std::string RocksEngine::kMetadataPrefix("\0\0\0\0metadata-", 12); - { // create block cache + RocksEngine::RocksEngine(const std::string& path, bool durable) + : _path(path), _durable(durable) { + { // create block cache uint64_t cacheSizeGB = 0; ProcessInfo pi; unsigned long long memSizeMB = pi.getMemSizeMB(); @@ -84,68 +99,43 @@ namespace mongo { } _block_cache = rocksdb::NewLRUCache(cacheSizeGB * 1024 * 1024 * 1024LL); } + // open DB + rocksdb::DB* db; + auto s = rocksdb::DB::Open(_options(), path, &db); + ROCKS_STATUS_OK(s); + _db.reset(db); - auto columnFamilyNames = _loadColumnFamilies(); // vector of column family names - std::unordered_map<std::string, Ordering> orderings; // column family name -> Ordering - std::set<std::string> collections; // set of collection names - - if (columnFamilyNames.empty()) { // new DB - columnFamilyNames.push_back(rocksdb::kDefaultColumnFamilyName); - } else { // existing DB - // open DB in read-only mode to load metadata - rocksdb::DB* dbReadOnly; - auto s = rocksdb::DB::OpenForReadOnly(_dbOptions(), path, &dbReadOnly); - ROCKS_STATUS_OK(s); - auto itr = dbReadOnly->NewIterator(rocksdb::ReadOptions()); - orderings = _loadOrderingMetaData(itr); - collections = _loadCollections(itr); - delete itr; - delete dbReadOnly; - } - - std::vector<rocksdb::ColumnFamilyDescriptor> columnFamilies; - std::set<std::string> toDropColumnFamily; - - for (const auto& cf : columnFamilyNames) { - if (cf == rocksdb::kDefaultColumnFamilyName) { - columnFamilies.emplace_back(cf, _defaultCFOptions()); - continue; - } - auto orderings_iter = orderings.find(cf); - auto collections_iter = collections.find(cf); - bool isIndex = orderings_iter != orderings.end(); - bool isCollection = collections_iter != collections.end(); - invariant(!isIndex || !isCollection); - if (isIndex) { - columnFamilies.emplace_back(cf, _indexOptions(orderings_iter->second)); - } else if (isCollection) { - columnFamilies.emplace_back(cf, _collectionOptions()); - } else { - // TODO support this from inside of rocksdb, by using - // Options::drop_unopened_column_families. - // This can happen because write and createColumnFamily are not atomic - toDropColumnFamily.insert(cf); - columnFamilies.emplace_back(cf, _collectionOptions()); - } + // open iterator + boost::scoped_ptr<rocksdb::Iterator> _iter(_db->NewIterator(rocksdb::ReadOptions())); + + // find maxPrefix + _maxPrefix = 0; + _iter->SeekToLast(); + if (_iter->Valid()) { + // otherwise the DB is empty, so we just keep it at 0 + bool ok = extractPrefix(_iter->key(), &_maxPrefix); + // this is DB corruption here + invariant(ok); } - std::vector<rocksdb::ColumnFamilyHandle*> handles; - rocksdb::DB* db; - auto s = rocksdb::DB::Open(_dbOptions(), path, columnFamilies, &handles, &db); - ROCKS_STATUS_OK(s); - invariant(handles.size() == columnFamilies.size()); - for (size_t i = 0; i < handles.size(); ++i) { - if (toDropColumnFamily.find(columnFamilies[i].name) != toDropColumnFamily.end()) { - db->DropColumnFamily(handles[i]); - delete handles[i]; - } else if (columnFamilyNames[i] == rocksdb::kDefaultColumnFamilyName) { - // we will not be needing this - delete handles[i]; - } else { - _identColumnFamilyMap[columnFamilies[i].name].reset(handles[i]); + // load ident to prefix map + { + boost::mutex::scoped_lock lk(_identPrefixMapMutex); + for (_iter->Seek(kMetadataPrefix); + _iter->Valid() && _iter->key().starts_with(kMetadataPrefix); _iter->Next()) { + rocksdb::Slice ident(_iter->key()); + ident.remove_prefix(kMetadataPrefix.size()); + // this could throw DBException, which then means DB corruption. We just let it fly + // to the caller + BSONObj identConfig(_iter->value().data()); + BSONElement element = identConfig.getField("prefix"); + // TODO: SERVER-16979 Correctly handle errors returned by RocksDB + // This is DB corruption + invariant(!element.eoo() || !element.isNumber()); + uint32_t identPrefix = static_cast<uint32_t>(element.numberInt()); + _identPrefixMap[StringData(ident.data(), ident.size())] = identPrefix; } } - _db.reset(db); } RocksEngine::~RocksEngine() {} @@ -154,197 +144,136 @@ namespace mongo { return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _durable); } - Status RocksEngine::createRecordStore(OperationContext* opCtx, - const StringData& ns, + Status RocksEngine::createRecordStore(OperationContext* opCtx, const StringData& ns, const StringData& ident, const CollectionOptions& options) { - if (_existsColumnFamily(ident)) { - return Status::OK(); - } - _db->Put(rocksdb::WriteOptions(), kCollectionPrefix + ident.toString(), rocksdb::Slice()); - return _createColumnFamily(_collectionOptions(), ident); + return _createIdentPrefix(ident); } RecordStore* RocksEngine::getRecordStore(OperationContext* opCtx, const StringData& ns, const StringData& ident, const CollectionOptions& options) { - auto columnFamily = _getColumnFamily(ident); if (options.capped) { return new RocksRecordStore( - ns, ident, _db.get(), columnFamily, true, + ns, ident, _db.get(), _getIdentPrefix(ident), true, options.cappedSize ? options.cappedSize : 4096, // default size options.cappedMaxDocs ? options.cappedMaxDocs : -1); } else { - return new RocksRecordStore(ns, ident, _db.get(), columnFamily); + return new RocksRecordStore(ns, ident, _db.get(), _getIdentPrefix(ident)); } } Status RocksEngine::createSortedDataInterface(OperationContext* opCtx, const StringData& ident, const IndexDescriptor* desc) { - if (_existsColumnFamily(ident)) { - return Status::OK(); - } - auto keyPattern = desc->keyPattern(); - - _db->Put(rocksdb::WriteOptions(), kOrderingPrefix + ident.toString(), - rocksdb::Slice(keyPattern.objdata(), keyPattern.objsize())); - return _createColumnFamily(_indexOptions(Ordering::make(keyPattern)), ident); + return _createIdentPrefix(ident); } SortedDataInterface* RocksEngine::getSortedDataInterface(OperationContext* opCtx, const StringData& ident, const IndexDescriptor* desc) { if (desc->unique()) { - return new RocksUniqueIndex(_db.get(), _getColumnFamily(ident), ident.toString(), + return new RocksUniqueIndex(_db.get(), _getIdentPrefix(ident), ident.toString(), Ordering::make(desc->keyPattern())); } else { - return new RocksStandardIndex(_db.get(), _getColumnFamily(ident), ident.toString(), + return new RocksStandardIndex(_db.get(), _getIdentPrefix(ident), ident.toString(), Ordering::make(desc->keyPattern())); } } Status RocksEngine::dropIdent(OperationContext* opCtx, const StringData& ident) { + // TODO optimize this using CompactionFilterV2 rocksdb::WriteBatch wb; - // TODO is there a more efficient way? - wb.Delete(kOrderingPrefix + ident.toString()); - wb.Delete(kCollectionPrefix + ident.toString()); + wb.Delete(kMetadataPrefix + ident.toString()); + + std::string prefix = _getIdentPrefix(ident); + rocksdb::Slice prefixSlice(prefix.data(), prefix.size()); + + boost::scoped_ptr<rocksdb::Iterator> _iter(_db->NewIterator(rocksdb::ReadOptions())); + for (_iter->Seek(prefixSlice); _iter->Valid() && _iter->key().starts_with(prefixSlice); + _iter->Next()) { + ROCKS_STATUS_OK(_iter->status()); + wb.Delete(_iter->key()); + } auto s = _db->Write(rocksdb::WriteOptions(), &wb); if (!s.ok()) { - return toMongoStatus(s); + return toMongoStatus(s); } - return _dropColumnFamily(ident); - } - std::vector<std::string> RocksEngine::getAllIdents( OperationContext* opCtx ) const { - std::vector<std::string> indents; - for (auto& entry : _identColumnFamilyMap) { - indents.push_back(entry.first); + { + boost::mutex::scoped_lock lk(_identPrefixMapMutex); + _identPrefixMap.erase(ident); } - return indents; - } - // non public api - - bool RocksEngine::_existsColumnFamily(const StringData& ident) { - boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex); - return _identColumnFamilyMap.find(ident) != _identColumnFamilyMap.end(); - } - - Status RocksEngine::_createColumnFamily(const rocksdb::ColumnFamilyOptions& options, - const StringData& ident) { - rocksdb::ColumnFamilyHandle* cf; - auto s = _db->CreateColumnFamily(options, ident.toString(), &cf); - if (!s.ok()) { - return toMongoStatus(s); - } - boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex); - _identColumnFamilyMap[ident].reset(cf); return Status::OK(); } - Status RocksEngine::_dropColumnFamily(const StringData& ident) { - boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily; - { - boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex); - auto cf_iter = _identColumnFamilyMap.find(ident); - if (cf_iter == _identColumnFamilyMap.end()) { - return Status(ErrorCodes::InternalError, "Not found"); - } - columnFamily = cf_iter->second; - _identColumnFamilyMap.erase(cf_iter); - } - auto s = _db->DropColumnFamily(columnFamily.get()); - return toMongoStatus(s); + bool RocksEngine::hasIdent(OperationContext* opCtx, const StringData& ident) const { + boost::mutex::scoped_lock lk(_identPrefixMapMutex); + return _identPrefixMap.find(ident) != _identPrefixMap.end(); } - boost::shared_ptr<rocksdb::ColumnFamilyHandle> RocksEngine::_getColumnFamily( - const StringData& ident) { - { - boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex); - auto cf_iter = _identColumnFamilyMap.find(ident); - invariant(cf_iter != _identColumnFamilyMap.end()); - return cf_iter->second; + std::vector<std::string> RocksEngine::getAllIdents(OperationContext* opCtx) const { + std::vector<std::string> indents; + for (auto& entry : _identPrefixMap) { + indents.push_back(entry.first); } + return indents; } - std::unordered_map<std::string, Ordering> RocksEngine::_loadOrderingMetaData( - rocksdb::Iterator* itr) { - std::unordered_map<std::string, Ordering> orderings; - for (itr->Seek(kOrderingPrefix); itr->Valid(); itr->Next()) { - rocksdb::Slice key(itr->key()); - if (!key.starts_with(kOrderingPrefix)) { - break; + // non public api + Status RocksEngine::_createIdentPrefix(const StringData& ident) { + uint32_t prefix = 0; + { + boost::mutex::scoped_lock lk(_identPrefixMapMutex); + if (_identPrefixMap.find(ident) != _identPrefixMap.end()) { + // already exists + return Status::OK(); } - key.remove_prefix(kOrderingPrefix.size()); - std::string value(itr->value().ToString()); - orderings.insert({key.ToString(), Ordering::make(BSONObj(value.c_str()))}); - } - ROCKS_STATUS_OK(itr->status()); - return orderings; - } - std::set<std::string> RocksEngine::_loadCollections(rocksdb::Iterator* itr) { - std::set<std::string> collections; - for (itr->Seek(kCollectionPrefix); itr->Valid() ; itr->Next()) { - rocksdb::Slice key(itr->key()); - if (!key.starts_with(kCollectionPrefix)) { - break; - } - key.remove_prefix(kCollectionPrefix.size()); - collections.insert(key.ToString()); + prefix = ++_maxPrefix; + _identPrefixMap[ident] = prefix; } - ROCKS_STATUS_OK(itr->status()); - return collections; - } - std::vector<std::string> RocksEngine::_loadColumnFamilies() { - std::vector<std::string> names; - if (boost::filesystem::exists(_path)) { - rocksdb::Status s = rocksdb::DB::ListColumnFamilies(_dbOptions(), _path, &names); + BSONObjBuilder builder; + builder.append("prefix", static_cast<int32_t>(prefix)); + BSONObj config = builder.obj(); - if (s.IsIOError()) { - // DNE, this means the directory exists but is empty, which is fine - // because it means no rocks database exists yet - } else { - ROCKS_STATUS_OK(s); - } - } + auto s = _db->Put(rocksdb::WriteOptions(), kMetadataPrefix + ident.toString(), + rocksdb::Slice(config.objdata(), config.objsize())); - return names; + return toMongoStatus(s); } - rocksdb::Options RocksEngine::_dbOptions() { - rocksdb::Options options(rocksdb::DBOptions(), _defaultCFOptions()); - - options.max_background_compactions = 4; - options.max_background_flushes = 4; - - // create the DB if it's not already present - options.create_if_missing = true; - options.create_missing_column_families = true; - options.wal_dir = _path + "/journal"; - options.max_total_wal_size = 1 << 30; // 1GB - - return options; + std::string RocksEngine::_getIdentPrefix(const StringData& ident) { + boost::mutex::scoped_lock lk(_identPrefixMapMutex); + auto prefixIter = _identPrefixMap.find(ident); + invariant(prefixIter != _identPrefixMap.end()); + return encodePrefix(prefixIter->second); } - rocksdb::ColumnFamilyOptions RocksEngine::_defaultCFOptions() { - // TODO pass or set appropriate options for default CF. - rocksdb::ColumnFamilyOptions options; + rocksdb::Options RocksEngine::_options() const { + rocksdb::Options options; rocksdb::BlockBasedTableOptions table_options; table_options.block_cache = _block_cache; table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10)); + table_options.format_version = 2; options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); + + options.write_buffer_size = 128 * 1024 * 1024; // 128MB options.max_write_buffer_number = 4; - return options; - } + options.max_background_compactions = 8; + options.max_background_flushes = 4; + options.target_file_size_base = 64 * 1024 * 1024; // 64MB + options.soft_rate_limit = 2; + options.hard_rate_limit = 3; + options.max_bytes_for_level_base = 512 * 1024 * 1024; // 512 MB + options.max_open_files = 20000; - rocksdb::ColumnFamilyOptions RocksEngine::_collectionOptions() { - return _defaultCFOptions(); - } + // create the DB if it's not already present + options.create_if_missing = true; + options.wal_dir = _path + "/journal"; - rocksdb::ColumnFamilyOptions RocksEngine::_indexOptions(const Ordering& order) { - return _defaultCFOptions(); + return options; } Status toMongoStatus( rocksdb::Status s ) { diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h index 12ee7eaf355..4078c13414e 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.h +++ b/src/mongo/db/storage/rocks/rocks_engine.h @@ -89,9 +89,8 @@ namespace mongo { virtual Status dropIdent(OperationContext* opCtx, const StringData& ident) override; - virtual bool hasIdent(OperationContext* opCtx, const StringData& ident) const { - return _identColumnFamilyMap.find(ident) != _identColumnFamilyMap.end();; - } + virtual bool hasIdent(OperationContext* opCtx, const StringData& ident) const override; + virtual std::vector<std::string> getAllIdents( OperationContext* opCtx ) const override; virtual bool supportsDocLocking() const override { @@ -123,22 +122,10 @@ namespace mongo { const rocksdb::DB* getDB() const { return _db.get(); } private: - bool _existsColumnFamily(const StringData& ident); - Status _createColumnFamily(const rocksdb::ColumnFamilyOptions& options, - const StringData& ident); - Status _dropColumnFamily(const StringData& ident); - boost::shared_ptr<rocksdb::ColumnFamilyHandle> _getColumnFamily(const StringData& ident); - - std::unordered_map<std::string, Ordering> _loadOrderingMetaData(rocksdb::Iterator* itr); - std::set<std::string> _loadCollections(rocksdb::Iterator* itr); - std::vector<std::string> _loadColumnFamilies(); - - rocksdb::ColumnFamilyOptions _collectionOptions(); - rocksdb::ColumnFamilyOptions _indexOptions(const Ordering& order); - - rocksdb::Options _dbOptions(); + Status _createIdentPrefix(const StringData& ident); + std::string _getIdentPrefix(const StringData& ident); - rocksdb::ColumnFamilyOptions _defaultCFOptions(); + rocksdb::Options _options() const; std::string _path; boost::scoped_ptr<rocksdb::DB> _db; @@ -146,18 +133,18 @@ namespace mongo { const bool _durable; - // Default column family is owned by the rocksdb::DB instance. - rocksdb::ColumnFamilyHandle* _defaultHandle; + // ident prefix map stores mapping from ident to a prefix (uint32_t) + mutable boost::mutex _identPrefixMapMutex; + typedef StringMap<uint32_t> IdentPrefixMap; + IdentPrefixMap _identPrefixMap; - mutable boost::mutex _identColumnFamilyMapMutex; - typedef StringMap<boost::shared_ptr<rocksdb::ColumnFamilyHandle> > IdentColumnFamilyMap; - IdentColumnFamilyMap _identColumnFamilyMap; + // protected by _identPrefixMapMutex + uint32_t _maxPrefix; // This is for concurrency control RocksTransactionEngine _transactionEngine; - static const std::string kOrderingPrefix; - static const std::string kCollectionPrefix; + static const std::string kMetadataPrefix; }; Status toMongoStatus( rocksdb::Status s ); diff --git a/src/mongo/db/storage/rocks/rocks_index.cpp b/src/mongo/db/storage/rocks/rocks_index.cpp index 2e7c0187efa..34068b44e48 100644 --- a/src/mongo/db/storage/rocks/rocks_index.cpp +++ b/src/mongo/db/storage/rocks/rocks_index.cpp @@ -100,16 +100,11 @@ namespace mongo { */ class RocksCursorBase : public SortedDataInterface::Cursor { public: - RocksCursorBase(OperationContext* txn, rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, + RocksCursorBase(OperationContext* txn, rocksdb::DB* db, std::string prefix, bool forward, Ordering order) - : _db(db), - _columnFamily(columnFamily), - _forward(forward), - _order(order), - _isKeyCurrent(false) { + : _db(db), _prefix(prefix), _forward(forward), _order(order), _isKeyCurrent(false) { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - _iterator.reset(ru->NewIterator(_columnFamily.get())); + _iterator.reset(ru->NewIterator(_prefix)); checkStatus(); } @@ -197,13 +192,13 @@ namespace mongo { if (!_savedEOF) { loadKeyIfNeeded(); _savedRecordId = getRecordId(); - _iterator.reset(); } + _iterator.reset(); } void restorePosition(OperationContext* txn) { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - _iterator.reset(ru->NewIterator(_columnFamily.get())); + _iterator.reset(ru->NewIterator(_prefix)); if (!_savedEOF) { _locate(_savedRecordId); @@ -285,7 +280,7 @@ namespace mongo { } rocksdb::DB* _db; // not owned - boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily; + std::string _prefix; boost::scoped_ptr<rocksdb::Iterator> _iterator; const bool _forward; Ordering _order; @@ -302,10 +297,9 @@ namespace mongo { class RocksStandardCursor : public RocksCursorBase { public: - RocksStandardCursor(OperationContext* txn, rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, + RocksStandardCursor(OperationContext* txn, rocksdb::DB* db, std::string prefix, bool forward, Ordering order) - : RocksCursorBase(txn, db, columnFamily, forward, order), _isTypeBitsValid(false) {} + : RocksCursorBase(txn, db, prefix, forward, order), _isTypeBitsValid(false) {} virtual void invalidateCache() { RocksCursorBase::invalidateCache(); @@ -371,10 +365,9 @@ namespace mongo { class RocksUniqueCursor : public RocksCursorBase { public: - RocksUniqueCursor(OperationContext* txn, rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, + RocksUniqueCursor(OperationContext* txn, rocksdb::DB* db, std::string prefix, bool forward, Ordering order) - : RocksCursorBase(txn, db, columnFamily, forward, order), _recordsIndex(0) {} + : RocksCursorBase(txn, db, prefix, forward, order), _recordsIndex(0) {} virtual void invalidateCache() { RocksCursorBase::invalidateCache(); @@ -504,10 +497,9 @@ namespace mongo { /// RocksIndexBase - RocksIndexBase::RocksIndexBase(rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf, - std::string ident, Ordering order) - : _db(db), _columnFamily(cf), _ident(std::move(ident)), _order(order) {} + RocksIndexBase::RocksIndexBase(rocksdb::DB* db, std::string prefix, std::string ident, + Ordering order) + : _db(db), _prefix(prefix), _ident(std::move(ident)), _order(order) {} SortedDataBuilderInterface* RocksIndexBase::getBulkBuilder(OperationContext* txn, bool dupsAllowed) { @@ -529,7 +521,7 @@ namespace mongo { bool RocksIndexBase::isEmpty(OperationContext* txn) { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - boost::scoped_ptr<rocksdb::Iterator> it(ru->NewIterator(_columnFamily.get())); + boost::scoped_ptr<rocksdb::Iterator> it(ru->NewIterator(_prefix)); it->SeekToFirst(); return !it->Valid(); @@ -541,32 +533,32 @@ namespace mongo { } long long RocksIndexBase::getSpaceUsedBytes(OperationContext* txn) const { - // TODO provide GetLiveFilesMetadata() with column family - std::vector<rocksdb::LiveFileMetaData> metadata; - _db->GetLiveFilesMetaData(&metadata); - uint64_t spaceUsedBytes = 0; - for (const auto& m : metadata) { - if (m.column_family_name == _ident) { - spaceUsedBytes += m.size; + uint64_t storageSize; + std::string nextPrefix(_prefix); + // first next lexicographically (assume same size) + for (int i = nextPrefix.size() - 1; i >= 0; --i) { + nextPrefix[i]++; + if (nextPrefix[i] != 0) { + break; } } - - uint64_t walSpaceUsed = 0; - _db->GetIntProperty(_columnFamily.get(), "rocksdb.cur-size-all-mem-tables", &walSpaceUsed); - return spaceUsedBytes + walSpaceUsed; + rocksdb::Range wholeRange(_prefix, nextPrefix); + _db->GetApproximateSizes(&wholeRange, 1, &storageSize); + return static_cast<long long>(storageSize); } - std::string RocksIndexBase::_getTransactionID(const KeyString& key) const { - // TODO optimize in the future - return _ident + std::string(key.getBuffer(), key.getSize()); + std::string RocksIndexBase::_makePrefixedKey(const std::string& prefix, + const KeyString& encodedKey) { + std::string key(prefix); + key.append(encodedKey.getBuffer(), encodedKey.getSize()); + return key; } /// RocksUniqueIndex - RocksUniqueIndex::RocksUniqueIndex(rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf, - std::string ident, Ordering order) - : RocksIndexBase(db, cf, ident, order) {} + RocksUniqueIndex::RocksUniqueIndex(rocksdb::DB* db, std::string prefix, std::string ident, + Ordering order) + : RocksIndexBase(db, prefix, ident, order) {} Status RocksUniqueIndex::insert(OperationContext* txn, const BSONObj& key, const RecordId& loc, bool dupsAllowed) { @@ -575,17 +567,16 @@ namespace mongo { return s; } - auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - KeyString encodedKey(key, _order); - rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize()); + std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey)); - if (!ru->transaction()->registerWrite(_getTransactionID(encodedKey))) { + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + if (!ru->transaction()->registerWrite(prefixedKey)) { throw WriteConflictException(); } std::string currentValue; - auto getStatus = ru->Get(_columnFamily.get(), keySlice, ¤tValue); + auto getStatus = ru->Get(prefixedKey, ¤tValue); if (!getStatus.ok() && !getStatus.IsNotFound()) { // This means that Get() returned an error // TODO: SERVER-16979 Correctly handle errors returned by RocksDB @@ -597,7 +588,7 @@ namespace mongo { value.appendTypeBits(encodedKey.getTypeBits()); } rocksdb::Slice valueSlice(value.getBuffer(), value.getSize()); - ru->writeBatch()->Put(_columnFamily.get(), keySlice, valueSlice); + ru->writeBatch()->Put(prefixedKey, valueSlice); return Status::OK(); } @@ -637,29 +628,29 @@ namespace mongo { } rocksdb::Slice valueVectorSlice(valueVector.getBuffer(), valueVector.getSize()); - ru->writeBatch()->Put(_columnFamily.get(), keySlice, valueVectorSlice); + ru->writeBatch()->Put(prefixedKey, valueVectorSlice); return Status::OK(); } void RocksUniqueIndex::unindex(OperationContext* txn, const BSONObj& key, const RecordId& loc, bool dupsAllowed) { KeyString encodedKey(key, _order); - rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize()); - RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey)); + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); // We can't let two threads unindex the same key - if (!ru->transaction()->registerWrite(_getTransactionID(encodedKey))) { + if (!ru->transaction()->registerWrite(prefixedKey)) { throw WriteConflictException(); } if (!dupsAllowed) { - ru->writeBatch()->Delete(_columnFamily.get(), keySlice); + ru->writeBatch()->Delete(prefixedKey); return; } // dups are allowed, so we have to deal with a vector of RecordIds. std::string currentValue; - auto getStatus = ru->Get(_columnFamily.get(), keySlice, ¤tValue); + auto getStatus = ru->Get(prefixedKey, ¤tValue); if (!getStatus.ok() && !getStatus.IsNotFound()) { // This means that Get() returned an error // TODO: SERVER-16979 Correctly handle errors returned by RocksDB @@ -681,7 +672,7 @@ namespace mongo { if (records.empty() && !br.remaining()) { // This is the common case: we are removing the only loc for this key. // Remove the whole entry. - ru->writeBatch()->Delete(_columnFamily.get(), keySlice); + ru->writeBatch()->Delete(prefixedKey); return; } @@ -710,23 +701,23 @@ namespace mongo { } rocksdb::Slice newValueSlice(newValue.getBuffer(), newValue.getSize()); - ru->writeBatch()->Put(_columnFamily.get(), keySlice, newValueSlice); + ru->writeBatch()->Put(prefixedKey, newValueSlice); } SortedDataInterface::Cursor* RocksUniqueIndex::newCursor(OperationContext* txn, int direction) const { invariant( ( direction == 1 || direction == -1 ) && "invalid value for direction" ); - return new RocksUniqueCursor(txn, _db, _columnFamily, direction == 1, _order); + return new RocksUniqueCursor(txn, _db, _prefix, direction == 1, _order); } Status RocksUniqueIndex::dupKeyCheck(OperationContext* txn, const BSONObj& key, const RecordId& loc) { KeyString encodedKey(key, _order); - rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize()); + std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey)); auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); std::string value; - auto getStatus = ru->Get(_columnFamily.get(), keySlice, &value); + auto getStatus = ru->Get(prefixedKey, &value); if (!getStatus.ok() && !getStatus.IsNotFound()) { // This means that Get() returned an error // TODO: SERVER-16979 Correctly handle errors returned by RocksDB @@ -750,10 +741,9 @@ namespace mongo { } /// RocksStandardIndex - RocksStandardIndex::RocksStandardIndex(rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf, - std::string ident, Ordering order) - : RocksIndexBase(db, cf, ident, order) {} + RocksStandardIndex::RocksStandardIndex(rocksdb::DB* db, std::string prefix, std::string ident, + Ordering order) + : RocksIndexBase(db, prefix, ident, order) {} Status RocksStandardIndex::insert(OperationContext* txn, const BSONObj& key, const RecordId& loc, bool dupsAllowed) { @@ -766,9 +756,7 @@ namespace mongo { // If we're inserting an index element, this means we already "locked" the RecordId of the // document. No need to register write here KeyString encodedKey(key, _order, loc); - rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize()); - - auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey)); rocksdb::Slice value; if (!encodedKey.getTypeBits().isAllZeros()) { @@ -777,8 +765,8 @@ namespace mongo { encodedKey.getTypeBits().getSize()); } - ru->writeBatch()->Put(_columnFamily.get(), - rocksdb::Slice(encodedKey.getBuffer(), encodedKey.getSize()), value); + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + ru->writeBatch()->Put(prefixedKey, value); return Status::OK(); } @@ -790,15 +778,15 @@ namespace mongo { // document. No need to register write here KeyString encodedKey(key, _order, loc); - rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize()); + std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey)); auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - ru->writeBatch()->Delete(_columnFamily.get(), keySlice); + ru->writeBatch()->Delete(prefixedKey); } SortedDataInterface::Cursor* RocksStandardIndex::newCursor(OperationContext* txn, int direction) const { invariant( ( direction == 1 || direction == -1 ) && "invalid value for direction" ); - return new RocksStandardCursor(txn, _db, _columnFamily, direction == 1, _order); + return new RocksStandardCursor(txn, _db, _prefix, direction == 1, _order); } diff --git a/src/mongo/db/storage/rocks/rocks_index.h b/src/mongo/db/storage/rocks/rocks_index.h index 71ce3ddfd75..3c91185aac2 100644 --- a/src/mongo/db/storage/rocks/rocks_index.h +++ b/src/mongo/db/storage/rocks/rocks_index.h @@ -40,7 +40,6 @@ #pragma once namespace rocksdb { - class ColumnFamilyHandle; class DB; } @@ -52,8 +51,7 @@ namespace mongo { MONGO_DISALLOW_COPYING(RocksIndexBase); public: - RocksIndexBase(rocksdb::DB* db, boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf, - std::string ident, Ordering order); + RocksIndexBase(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order); virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed); @@ -73,14 +71,12 @@ namespace mongo { virtual long long getSpaceUsedBytes( OperationContext* txn ) const; protected: - std::string _getTransactionID(const KeyString& key) const; + static std::string _makePrefixedKey(const std::string& prefix, const KeyString& encodedKey); rocksdb::DB* _db; // not owned - // Each index is stored as a single column family, so this stores the handle to the - // relevant column family - boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily; - + // Each key in the index is prefixed with _prefix + std::string _prefix; std::string _ident; // used to construct RocksCursors @@ -89,8 +85,7 @@ namespace mongo { class RocksUniqueIndex : public RocksIndexBase { public: - RocksUniqueIndex(rocksdb::DB* db, boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf, - std::string ident, Ordering order); + RocksUniqueIndex(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order); virtual Status insert(OperationContext* txn, const BSONObj& key, const RecordId& loc, bool dupsAllowed); @@ -103,8 +98,7 @@ namespace mongo { class RocksStandardIndex : public RocksIndexBase { public: - RocksStandardIndex(rocksdb::DB* db, boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf, - std::string ident, Ordering order); + RocksStandardIndex(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order); virtual Status insert(OperationContext* txn, const BSONObj& key, const RecordId& loc, bool dupsAllowed); diff --git a/src/mongo/db/storage/rocks/rocks_index_test.cpp b/src/mongo/db/storage/rocks/rocks_index_test.cpp index 5b1c10b6aef..43effa007b1 100644 --- a/src/mongo/db/storage/rocks/rocks_index_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_index_test.cpp @@ -58,26 +58,18 @@ namespace mongo { RocksIndexHarness() : _order(Ordering::make(BSONObj())), _tempDir(_testNamespace) { boost::filesystem::remove_all(_tempDir.path()); rocksdb::DB* db; - std::vector<rocksdb::ColumnFamilyDescriptor> cfs; - cfs.emplace_back(); - cfs.emplace_back("index", rocksdb::ColumnFamilyOptions()); - rocksdb::DBOptions db_options; - db_options.create_if_missing = true; - db_options.create_missing_column_families = true; - std::vector<rocksdb::ColumnFamilyHandle*> handles; - auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db); + rocksdb::Options options; + options.create_if_missing = true; + auto s = rocksdb::DB::Open(options, _tempDir.path(), &db); ASSERT(s.ok()); _db.reset(db); - _cf.reset(handles[1]); } virtual SortedDataInterface* newSortedDataInterface(bool unique) { if (unique) { - return new RocksUniqueIndex(_db.get(), _cf, rocksdb::kDefaultColumnFamilyName, - _order); + return new RocksUniqueIndex(_db.get(), "prefix", "ident", _order); } else { - return new RocksStandardIndex(_db.get(), _cf, rocksdb::kDefaultColumnFamilyName, - _order); + return new RocksStandardIndex(_db.get(), "prefix", "ident", _order); } } @@ -90,7 +82,6 @@ namespace mongo { string _testNamespace = "mongo-rocks-sorted-data-test"; unittest::TempDir _tempDir; scoped_ptr<rocksdb::DB> _db; - shared_ptr<rocksdb::ColumnFamilyHandle> _cf; RocksTransactionEngine _transactionEngine; }; diff --git a/src/mongo/db/storage/rocks/rocks_init.cpp b/src/mongo/db/storage/rocks/rocks_init.cpp index 9442d5c30d5..ebf9e8930b3 100644 --- a/src/mongo/db/storage/rocks/rocks_init.cpp +++ b/src/mongo/db/storage/rocks/rocks_init.cpp @@ -99,7 +99,10 @@ namespace mongo { // Current disk format. We bump this number when we change the disk format. MongoDB will // fail to start if the versions don't match. In that case a user needs to run mongodump // and mongorestore. - const int kRocksFormatVersion = 1; + // * Version 1 was the format with many column families -- one column family for each + // collection and index + // * Version 2 (current) keeps all collections and indexes in a single column family + const int kRocksFormatVersion = 2; const std::string kRocksFormatVersionString = "rocksFormatVersion"; }; } // namespace diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp index a53658fab5a..0fc42f6d484 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp @@ -137,12 +137,12 @@ namespace mongo { RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, // not owned here - boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, - bool isCapped, int64_t cappedMaxSize, int64_t cappedMaxDocs, + std::string prefix, bool isCapped, int64_t cappedMaxSize, + int64_t cappedMaxDocs, CappedDocumentDeleteCallback* cappedDeleteCallback) : RecordStore(ns), _db(db), - _columnFamily(columnFamily), + _prefix(std::move(prefix)), _isCapped(isCapped), _cappedMaxSize(cappedMaxSize), _cappedMaxDocs(cappedMaxDocs), @@ -153,10 +153,8 @@ namespace mongo { _cappedVisibilityManager((_isCapped || _isOplog) ? new CappedVisibilityManager() : nullptr), _ident(id.toString()), - _dataSizeKey("datasize-" + id.toString()), - _numRecordsKey("numrecords-" + id.toString()) { - invariant( _db ); - invariant( _columnFamily ); + _dataSizeKey(std::string("\0\0\0\0", 4) + "datasize-" + id.toString()), + _numRecordsKey(std::string("\0\0\0\0", 4) + "numrecords-" + id.toString()) { if (_isCapped) { invariant(_cappedMaxSize > 0); @@ -169,7 +167,7 @@ namespace mongo { // Get next id boost::scoped_ptr<rocksdb::Iterator> iter( - db->NewIterator(_readOptions(), columnFamily.get())); + RocksRecoveryUnit::NewIteratorNoSnapshot(_db, _prefix)); iter->SeekToLast(); if (iter->Valid()) { rocksdb::Slice lastSlice = iter->key(); @@ -178,70 +176,43 @@ namespace mongo { _cappedVisibilityManager->updateHighestSeen(lastId); } _nextIdNum.store(lastId.repr() + 1); - } - else { + } else { // Need to start at 1 so we are always higher than RecordId::min() - _nextIdNum.store( 1 ); + _nextIdNum.store(1); } // load metadata - std::string value; - bool metadataPresent = true; - // XXX not using a Snapshot here - if (!_db->Get(_readOptions(), rocksdb::Slice(_numRecordsKey), &value).ok()) { - _numRecords.store(0); - metadataPresent = false; - } - else { - long long numRecords = 0; - memcpy( &numRecords, value.data(), sizeof(numRecords)); - _numRecords.store(numRecords); - } - - // XXX not using a Snapshot here - if (!_db->Get(_readOptions(), rocksdb::Slice(_dataSizeKey), &value).ok()) { - _dataSize.store(0); - invariant(!metadataPresent); - } - else { - invariant(value.size() == sizeof(long long)); - long long ds; - memcpy(&ds, value.data(), sizeof(long long)); - invariant(ds >= 0); - _dataSize.store(ds); - } + _numRecords.store(RocksRecoveryUnit::getCounterValue(_db, _numRecordsKey)); + _dataSize.store(RocksRecoveryUnit::getCounterValue(_db, _dataSizeKey)); + invariant(_dataSize.load() >= 0); + invariant(_numRecords.load() >= 0); } - int64_t RocksRecordStore::storageSize( OperationContext* txn, - BSONObjBuilder* extraInfo, - int infoLevel ) const { - uint64_t storageSize; - int64_t minLocStorage, maxLocStorage; - rocksdb::Range wholeRange(_makeKey(RecordId(), &minLocStorage), - _makeKey(RecordId::max(), &maxLocStorage)); - _db->GetApproximateSizes(_columnFamily.get(), &wholeRange, 1, &storageSize); - return static_cast<int64_t>( storageSize ); + int64_t RocksRecordStore::storageSize(OperationContext* txn, BSONObjBuilder* extraInfo, + int infoLevel) const { + // we're lying, but that's the best we can do for now + return _dataSize.load(); } RecordData RocksRecordStore::dataFor(OperationContext* txn, const RecordId& loc) const { - RecordData rd = _getDataFor(_db, _columnFamily.get(), txn, loc); + RecordData rd = _getDataFor(_db, _prefix, txn, loc); massert(28605, "Didn't find RecordId in RocksRecordStore", (rd.data() != nullptr)); return rd; } void RocksRecordStore::deleteRecord( OperationContext* txn, const RecordId& dl ) { - RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); - if (!ru->transaction()->registerWrite(_getTransactionID(dl))) { + std::string key(_makePrefixedKey(_prefix, dl)); + + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + if (!ru->transaction()->registerWrite(key)) { throw WriteConflictException(); } std::string oldValue; - int64_t locStorage; - rocksdb::Slice key = _makeKey(dl, &locStorage); - ru->Get(_columnFamily.get(), key, &oldValue); + ru->Get(key, &oldValue); int oldLength = oldValue.size(); - ru->writeBatch()->Delete(_columnFamily.get(), key); + ru->writeBatch()->Delete(key); _changeNumRecords(txn, false); _increaseDataSize(txn, -oldLength); @@ -304,7 +275,7 @@ namespace mongo { try { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_columnFamily.get())); + boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix)); iter->SeekToFirst(); while (cappedAndNeedDelete(dataSizeDelta, numRecordsDelta) && iter->Valid()) { @@ -357,6 +328,7 @@ namespace mongo { const char* data, int len, bool enforceQuota ) { + if ( _isCapped && len > _cappedMaxSize ) { return StatusWith<RecordId>( ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize" ); @@ -379,15 +351,9 @@ namespace mongo { loc = _nextId(); } - // XXX it might be safe to remove this, since we just allocated new unique RecordId. - // However, we need to check if any other transaction can start modifying this RecordId - // before our transaction is committed - if (!ru->transaction()->registerWrite(_getTransactionID(loc))) { - throw WriteConflictException(); - } - int64_t locStorage; - ru->writeBatch()->Put(_columnFamily.get(), _makeKey(loc, &locStorage), - rocksdb::Slice(data, len)); + // No need to register the write here, since we just allocated a new RecordId so no other + // transaction can access this key before we commit + ru->writeBatch()->Put(_makePrefixedKey(_prefix, loc), rocksdb::Slice(data, len)); _changeNumRecords( txn, true ); _increaseDataSize( txn, len ); @@ -413,15 +379,15 @@ namespace mongo { int len, bool enforceQuota, UpdateNotifier* notifier ) { + std::string key(_makePrefixedKey(_prefix, loc)); + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); - if (!ru->transaction()->registerWrite(_getTransactionID(loc))) { + if (!ru->transaction()->registerWrite(key)) { throw WriteConflictException(); } std::string old_value; - int64_t locStorage; - rocksdb::Slice key = _makeKey(loc, &locStorage); - auto status = ru->Get(_columnFamily.get(), key, &old_value); + auto status = ru->Get(key, &old_value); if ( !status.ok() ) { return StatusWith<RecordId>( ErrorCodes::InternalError, status.ToString() ); @@ -429,7 +395,7 @@ namespace mongo { int old_length = old_value.size(); - ru->writeBatch()->Put(_columnFamily.get(), key, rocksdb::Slice(data, len)); + ru->writeBatch()->Put(key, rocksdb::Slice(data, len)); _increaseDataSize(txn, len - old_length); @@ -439,7 +405,7 @@ namespace mongo { } bool RocksRecordStore::updateWithDamagesSupported() const { - return true; + return false; } Status RocksRecordStore::updateWithDamages( OperationContext* txn, @@ -447,39 +413,7 @@ namespace mongo { const RecordData& oldRec, const char* damageSource, const mutablebson::DamageVector& damages ) { - RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); - if (!ru->transaction()->registerWrite(_getTransactionID(loc))) { - throw WriteConflictException(); - } - - int64_t locStorage; - rocksdb::Slice key = _makeKey(loc, &locStorage); - - // get original value - std::string value; - rocksdb::Status status; - status = ru->Get(_columnFamily.get(), key, &value); - - if ( !status.ok() ) { - if ( status.IsNotFound() ) - return Status( ErrorCodes::InternalError, "doc not found for in-place update" ); - - log() << "rocks Get failed, blowing up: " << status.ToString(); - invariant( false ); - } - - // apply changes to our copy - for( size_t i = 0; i < damages.size(); i++ ) { - mutablebson::DamageEvent event = damages[i]; - const char* sourcePtr = damageSource + event.sourceOffset; - - invariant( event.targetOffset + event.size < value.length() ); - value.replace( event.targetOffset, event.size, sourcePtr, event.size ); - } - - // write back - ru->writeBatch()->Put(_columnFamily.get(), key, value); - + invariant(false); return Status::OK(); } @@ -494,18 +428,17 @@ namespace mongo { } } - return new Iterator(txn, _db, _columnFamily, _cappedVisibilityManager, dir, start); + return new Iterator(txn, _db, _prefix, _cappedVisibilityManager, dir, start); } std::vector<RecordIterator*> RocksRecordStore::getManyIterators(OperationContext* txn) const { - return {new Iterator(txn, _db, _columnFamily, _cappedVisibilityManager, + return {new Iterator(txn, _db, _prefix, _cappedVisibilityManager, CollectionScanParams::FORWARD, RecordId())}; } Status RocksRecordStore::truncate( OperationContext* txn ) { // XXX once we have readable WriteBatch, also delete outstanding writes to // this collection in the WriteBatch - //AFB add Clear(ColumnFamilyHandle*) boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) ); while( !iter->isEOF() ) { RecordId loc = iter->getNext(); @@ -519,7 +452,11 @@ namespace mongo { RecordStoreCompactAdaptor* adaptor, const CompactOptions* options, CompactStats* stats ) { - rocksdb::Status status = _db->CompactRange(_columnFamily.get(), NULL, NULL); + std::string beginString(_makePrefixedKey(_prefix, RecordId())); + std::string endString(_makePrefixedKey(_prefix, RecordId::max())); + rocksdb::Slice beginRange(beginString); + rocksdb::Slice endRange(endString); + rocksdb::Status status = _db->CompactRange(&beginRange, &endRange); if ( status.ok() ) return Status::OK(); else @@ -571,7 +508,7 @@ namespace mongo { result->appendIntOrLL("max", _cappedMaxDocs); result->appendIntOrLL("maxSize", _cappedMaxSize / scale); } - bool valid = _db->GetProperty(_columnFamily.get(), "rocksdb.stats", &statsString); + bool valid = _db->GetProperty("rocksdb.stats", &statsString); invariant( valid ); result->append( "stats", statsString ); } @@ -611,9 +548,9 @@ namespace mongo { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); ru->setOplogReadTill(_cappedVisibilityManager->oplogStartHack()); - boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_columnFamily.get())); - int64_t locStorage; - iter->Seek(_makeKey(startingPosition, &locStorage)); + boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix)); + int64_t storage; + iter->Seek(_makeKey(startingPosition, &storage)); if (!iter->Valid()) { iter->SeekToLast(); if (iter->Valid()) { @@ -678,6 +615,14 @@ namespace mongo { return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage)); } + std::string RocksRecordStore::_makePrefixedKey(const std::string& prefix, const RecordId& loc) { + int64_t storage; + auto encodedLoc = _makeKey(loc, &storage); + std::string key(prefix); + key.append(encodedLoc.data(), encodedLoc.size()); + return key; + } + RecordId RocksRecordStore::_makeRecordId(const rocksdb::Slice& slice) { invariant(slice.size() == sizeof(int64_t)); int64_t repr = endian::bigToNative(*reinterpret_cast<const int64_t*>(slice.data())); @@ -687,20 +632,19 @@ namespace mongo { bool RocksRecordStore::findRecord( OperationContext* txn, const RecordId& loc, RecordData* out ) const { - RecordData rd = _getDataFor(_db, _columnFamily.get(), txn, loc); + RecordData rd = _getDataFor(_db, _prefix, txn, loc); if ( rd.data() == NULL ) return false; *out = rd; return true; } - RecordData RocksRecordStore::_getDataFor(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, + RecordData RocksRecordStore::_getDataFor(rocksdb::DB* db, const std::string& prefix, OperationContext* txn, const RecordId& loc) { RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); std::string valueStorage; - int64_t locStorage; - auto status = ru->Get(cf, _makeKey(loc, &locStorage), &valueStorage); + auto status = ru->Get(_makePrefixedKey(prefix, loc), &valueStorage); if (!status.ok()) { if (status.IsNotFound()) { return RecordData(nullptr, 0); @@ -715,7 +659,6 @@ namespace mongo { return RecordData(data.moveFrom(), valueStorage.size()); } - // XXX make sure these work with rollbacks (I don't think they will) void RocksRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) { RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); if ( insert ) { @@ -731,26 +674,20 @@ namespace mongo { ru->incrementCounter(_dataSizeKey, &_dataSize, amount); } - std::string RocksRecordStore::_getTransactionID(const RecordId& rid) const { - // TODO -- optimize in the future - return _ident + std::string(reinterpret_cast<const char*>(&rid), sizeof(rid)); - } - // -------- RocksRecordStore::Iterator::Iterator( - OperationContext* txn, rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, + OperationContext* txn, rocksdb::DB* db, std::string prefix, boost::shared_ptr<CappedVisibilityManager> cappedVisibilityManager, const CollectionScanParams::Direction& dir, const RecordId& start) : _txn(txn), _db(db), - _cf(columnFamily), + _prefix(std::move(prefix)), _cappedVisibilityManager(cappedVisibilityManager), _dir(dir), _eof(true), _readUntilForOplog(RocksRecoveryUnit::getRocksRecoveryUnit(txn)->getOplogReadTill()), - _iterator(RocksRecoveryUnit::getRocksRecoveryUnit(txn)->NewIterator(_cf.get())) { + _iterator(RocksRecoveryUnit::getRocksRecoveryUnit(txn)->NewIterator(_prefix)) { _locate(start); } @@ -828,7 +765,7 @@ namespace mongo { } auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - _iterator.reset(ru->NewIterator(_cf.get())); + _iterator.reset(ru->NewIterator(_prefix)); RecordId saved = _lastLoc; _locate(_lastLoc); @@ -862,7 +799,7 @@ namespace mongo { memcpy(data.get(), _iterator->value().data(), _iterator->value().size()); return RecordData(data.moveFrom(), _iterator->value().size()); } - return RocksRecordStore::_getDataFor(_db, _cf.get(), _txn, loc); + return RocksRecordStore::_getDataFor(_db, _prefix, _txn, loc); } void RocksRecordStore::Iterator::_locate(const RecordId& loc) { diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h index 3cc1a607c1d..fdd2763a92f 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.h +++ b/src/mongo/db/storage/rocks/rocks_record_store.h @@ -44,7 +44,6 @@ #include "mongo/platform/atomic_word.h" namespace rocksdb { - class ColumnFamilyHandle; class DB; class Iterator; class Slice; @@ -79,8 +78,7 @@ namespace mongo { class RocksRecordStore : public RecordStore { public: - RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, + RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, std::string prefix, bool isCapped = false, int64_t cappedMaxSize = -1, int64_t cappedMaxDocs = -1, CappedDocumentDeleteCallback* cappedDeleteCallback = NULL); @@ -193,8 +191,7 @@ namespace mongo { // shared_ptrs class Iterator : public RecordIterator { public: - Iterator(OperationContext* txn, rocksdb::DB* db, - boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily, + Iterator(OperationContext* txn, rocksdb::DB* db, std::string prefix, boost::shared_ptr<CappedVisibilityManager> cappedVisibilityManager, const CollectionScanParams::Direction& dir, const RecordId& start); @@ -214,7 +211,7 @@ namespace mongo { OperationContext* _txn; rocksdb::DB* _db; // not owned - boost::shared_ptr<rocksdb::ColumnFamilyHandle> _cf; + std::string _prefix; boost::shared_ptr<CappedVisibilityManager> _cappedVisibilityManager; CollectionScanParams::Direction _dir; bool _eof; @@ -232,7 +229,7 @@ namespace mongo { static RecordId _makeRecordId( const rocksdb::Slice& slice ); - static RecordData _getDataFor(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf, + static RecordData _getDataFor(rocksdb::DB* db, const std::string& prefix, OperationContext* txn, const RecordId& loc); RecordId _nextId(); @@ -241,14 +238,13 @@ namespace mongo { // The use of this function requires that the passed in storage outlives the returned Slice static rocksdb::Slice _makeKey(const RecordId& loc, int64_t* storage); + static std::string _makePrefixedKey(const std::string& prefix, const RecordId& loc); void _changeNumRecords(OperationContext* txn, bool insert); void _increaseDataSize(OperationContext* txn, int amount); - std::string _getTransactionID(const RecordId& rid) const; - rocksdb::DB* _db; // not owned - boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily; + std::string _prefix; const bool _isCapped; const int64_t _cappedMaxSize; diff --git a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp index 1a52b18a9b2..b6ebcc7dfc4 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp @@ -58,30 +58,23 @@ namespace mongo { RocksRecordStoreHarnessHelper() : _tempDir(_testNamespace) { boost::filesystem::remove_all(_tempDir.path()); rocksdb::DB* db; - std::vector<rocksdb::ColumnFamilyDescriptor> cfs; - cfs.emplace_back(); - cfs.emplace_back("record_store", rocksdb::ColumnFamilyOptions()); - rocksdb::DBOptions db_options; - db_options.create_if_missing = true; - db_options.create_missing_column_families = true; - std::vector<rocksdb::ColumnFamilyHandle*> handles; - auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db); + rocksdb::Options options; + options.create_if_missing = true; + auto s = rocksdb::DB::Open(options, _tempDir.path(), &db); ASSERT(s.ok()); _db.reset(db); - delete handles[0]; - _cf.reset(handles[1]); } virtual RecordStore* newNonCappedRecordStore() { return newNonCappedRecordStore("foo.bar"); } RecordStore* newNonCappedRecordStore(const std::string& ns) { - return new RocksRecordStore(ns, "1", _db.get(), _cf); + return new RocksRecordStore(ns, "1", _db.get(), "prefix"); } RecordStore* newCappedRecordStore(const std::string& ns, int64_t cappedMaxSize, int64_t cappedMaxDocs) { - return new RocksRecordStore(ns, "1", _db.get(), _cf, true, cappedMaxSize, + return new RocksRecordStore(ns, "1", _db.get(), "prefix", true, cappedMaxSize, cappedMaxDocs); } @@ -93,7 +86,6 @@ namespace mongo { string _testNamespace = "mongo-rocks-record-store-test"; unittest::TempDir _tempDir; boost::scoped_ptr<rocksdb::DB> _db; - boost::shared_ptr<rocksdb::ColumnFamilyHandle> _cf; RocksTransactionEngine _transactionEngine; }; diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp index 05237f5941e..4217573c7e4 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp @@ -29,6 +29,7 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" +#include "mongo/platform/endian.h" #include "mongo/db/storage/rocks/rocks_recovery_unit.h" @@ -47,6 +48,66 @@ #include "mongo/util/log.h" namespace mongo { + namespace { + class PrefixStrippingIterator : public rocksdb::Iterator { + public: + // baseIterator is consumed + PrefixStrippingIterator(std::string prefix, Iterator* baseIterator) + : _prefix(std::move(prefix)), + _prefixSlice(_prefix.data(), _prefix.size()), + _baseIterator(baseIterator) {} + + virtual bool Valid() const { + return _baseIterator->Valid() && _baseIterator->key().starts_with(_prefixSlice); + } + + virtual void SeekToFirst() { _baseIterator->Seek(_prefixSlice); } + virtual void SeekToLast() { + // next prefix lexicographically, assume same length + std::string nextPrefix(_prefix); + invariant(nextPrefix.size() > 0); + for (size_t i = nextPrefix.size() - 1; i >= 0; ++i) { + nextPrefix[i]++; + // if it's == 0, that means we've overflowed, so need to keep adding + if (nextPrefix[i] != 0) { + break; + } + } + + _baseIterator->Seek(nextPrefix); + if (!_baseIterator->Valid()) { + _baseIterator->SeekToLast(); + } + if (_baseIterator->Valid() && !_baseIterator->key().starts_with(_prefixSlice)) { + _baseIterator->Prev(); + } + } + + virtual void Seek(const rocksdb::Slice& target) { + std::unique_ptr<char[]> buffer(new char[_prefix.size() + target.size()]); + memcpy(buffer.get(), _prefix.data(), _prefix.size()); + memcpy(buffer.get() + _prefix.size(), target.data(), target.size()); + _baseIterator->Seek(rocksdb::Slice(buffer.get(), _prefix.size() + target.size())); + } + + virtual void Next() { _baseIterator->Next(); } + virtual void Prev() { _baseIterator->Prev(); } + + virtual rocksdb::Slice key() const { + rocksdb::Slice strippedKey = _baseIterator->key(); + strippedKey.remove_prefix(_prefix.size()); + return strippedKey; + } + virtual rocksdb::Slice value() const { return _baseIterator->value(); } + virtual rocksdb::Status status() const { return _baseIterator->status(); } + + private: + std::string _prefix; + rocksdb::Slice _prefixSlice; + std::unique_ptr<Iterator> _baseIterator; + }; + + } // anonymous namespace RocksRecoveryUnit::RocksRecoveryUnit(RocksTransactionEngine* transactionEngine, rocksdb::DB* db, bool durable) @@ -134,10 +195,9 @@ namespace mongo { auto& counter = pair.second; counter._value->fetch_add(counter._delta, std::memory_order::memory_order_relaxed); long long newValue = counter._value->load(std::memory_order::memory_order_relaxed); - - // TODO: make the encoding platform indepdent. - const char* nr_ptr = reinterpret_cast<char*>(&newValue); - writeBatch()->Put(pair.first, rocksdb::Slice(nr_ptr, sizeof(long long))); + int64_t littleEndian = static_cast<int64_t>(endian::littleToNative(newValue)); + const char* nr_ptr = reinterpret_cast<const char*>(&littleEndian); + writeBatch()->Put(pair.first, rocksdb::Slice(nr_ptr, sizeof(littleEndian))); } if (_writeBatch->GetWriteBatch()->Count() != 0) { @@ -181,11 +241,9 @@ namespace mongo { return _snapshot; } - rocksdb::Status RocksRecoveryUnit::Get(rocksdb::ColumnFamilyHandle* columnFamily, - const rocksdb::Slice& key, std::string* value) { + rocksdb::Status RocksRecoveryUnit::Get(const rocksdb::Slice& key, std::string* value) { if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) { - boost::scoped_ptr<rocksdb::WBWIIterator> wb_iterator( - _writeBatch->NewIterator(columnFamily)); + boost::scoped_ptr<rocksdb::WBWIIterator> wb_iterator(_writeBatch->NewIterator()); wb_iterator->Seek(key); if (wb_iterator->Valid() && wb_iterator->Entry().key == key) { const auto& entry = wb_iterator->Entry(); @@ -199,19 +257,23 @@ namespace mongo { } rocksdb::ReadOptions options; options.snapshot = snapshot(); - return _db->Get(options, columnFamily, key, value); + return _db->Get(options, key, value); } - rocksdb::Iterator* RocksRecoveryUnit::NewIterator(rocksdb::ColumnFamilyHandle* columnFamily) { - invariant(columnFamily != _db->DefaultColumnFamily()); - + rocksdb::Iterator* RocksRecoveryUnit::NewIterator(std::string prefix) { rocksdb::ReadOptions options; options.snapshot = snapshot(); - auto iterator = _db->NewIterator(options, columnFamily); + auto iterator = _db->NewIterator(options); if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) { - iterator = _writeBatch->NewIteratorWithBase(columnFamily, iterator); + iterator = _writeBatch->NewIteratorWithBase(iterator); } - return iterator; + return new PrefixStrippingIterator(std::move(prefix), iterator); + } + + rocksdb::Iterator* RocksRecoveryUnit::NewIteratorNoSnapshot(rocksdb::DB* db, + std::string prefix) { + auto iterator = db->NewIterator(rocksdb::ReadOptions()); + return new PrefixStrippingIterator(std::move(prefix), iterator); } void RocksRecoveryUnit::incrementCounter(const rocksdb::Slice& counterKey, @@ -238,6 +300,23 @@ namespace mongo { } } + long long RocksRecoveryUnit::getCounterValue(rocksdb::DB* db, const rocksdb::Slice counterKey) { + std::string value; + auto s = db->Get(rocksdb::ReadOptions(), counterKey, &value); + if (s.IsNotFound()) { + return 0; + } else if (!s.ok()) { + log() << "Counter get failed " << s.ToString(); + invariant(!"Counter get failed"); + } + + int64_t ret; + invariant(sizeof(ret) == value.size()); + memcpy(&ret, value.data(), sizeof(ret)); + // we store counters in little endian + return static_cast<long long>(endian::littleToNative(ret)); + } + RocksRecoveryUnit* RocksRecoveryUnit::getRocksRecoveryUnit(OperationContext* opCtx) { return checked_cast<RocksRecoveryUnit*>(opCtx->recoveryUnit()); } diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h index e1f95e1b0a7..70217a3d681 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h @@ -50,7 +50,6 @@ namespace rocksdb { class WriteBatchWithIndex; class Comparator; class Status; - class ColumnFamilyHandle; class Slice; class Iterator; } @@ -91,16 +90,19 @@ namespace mongo { RocksTransaction* transaction() { return &_transaction; } - rocksdb::Status Get(rocksdb::ColumnFamilyHandle* columnFamily, const rocksdb::Slice& key, - std::string* value); + rocksdb::Status Get(const rocksdb::Slice& key, std::string* value); - rocksdb::Iterator* NewIterator(rocksdb::ColumnFamilyHandle* columnFamily); + rocksdb::Iterator* NewIterator(std::string prefix); + + static rocksdb::Iterator* NewIteratorNoSnapshot(rocksdb::DB* db, std::string prefix); void incrementCounter(const rocksdb::Slice& counterKey, std::atomic<long long>* counter, long long delta); long long getDeltaCounter(const rocksdb::Slice& counterKey); + static long long getCounterValue(rocksdb::DB* db, const rocksdb::Slice counterKey); + void setOplogReadTill(const RecordId& loc); RecordId getOplogReadTill() const { return _oplogReadTill; } |