summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.cpp315
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.h37
-rw-r--r--src/mongo/db/storage/rocks/rocks_index.cpp126
-rw-r--r--src/mongo/db/storage/rocks/rocks_index.h18
-rw-r--r--src/mongo/db/storage/rocks/rocks_index_test.cpp19
-rw-r--r--src/mongo/db/storage/rocks/rocks_init.cpp5
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.cpp187
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.h16
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store_test.cpp18
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.cpp109
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.h10
11 files changed, 379 insertions, 481 deletions
diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp
index 604798cb3ea..749895a6edc 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.cpp
+++ b/src/mongo/db/storage/rocks/rocks_engine.cpp
@@ -36,6 +36,7 @@
#include <boost/filesystem/operations.hpp>
#include <boost/make_shared.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
#include <rocksdb/cache.h>
#include <rocksdb/comparator.h>
@@ -52,6 +53,7 @@
#include "mongo/db/storage/rocks/rocks_record_store.h"
#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
#include "mongo/db/storage/rocks/rocks_index.h"
+#include "mongo/platform/endian.h"
#include "mongo/util/log.h"
#include "mongo/util/processinfo.h"
@@ -62,16 +64,29 @@
namespace mongo {
- using boost::shared_ptr;
+ namespace {
+ // we encode prefixes in big endian because we want to quickly jump to the max prefix
+ // (iter->SeekToLast())
+ bool extractPrefix(const rocksdb::Slice& slice, uint32_t* prefix) {
+ if (slice.size() < sizeof(uint32_t)) {
+ return false;
+ }
+ *prefix = endian::bigToNative(*reinterpret_cast<const uint32_t*>(slice.data()));
+ return true;
+ }
- const std::string RocksEngine::kOrderingPrefix("indexordering-");
- const std::string RocksEngine::kCollectionPrefix("collection-");
+ std::string encodePrefix(uint32_t prefix) {
+ uint32_t bigEndianPrefix = endian::nativeToBig(prefix);
+ return std::string(reinterpret_cast<const char*>(&bigEndianPrefix), sizeof(uint32_t));
+ }
+ } // anonymous namespace
- RocksEngine::RocksEngine(const std::string& path, bool durable)
- : _path(path),
- _durable(durable) {
+ // first four bytes are the default prefix 0
+ const std::string RocksEngine::kMetadataPrefix("\0\0\0\0metadata-", 12);
- { // create block cache
+ RocksEngine::RocksEngine(const std::string& path, bool durable)
+ : _path(path), _durable(durable) {
+ { // create block cache
uint64_t cacheSizeGB = 0;
ProcessInfo pi;
unsigned long long memSizeMB = pi.getMemSizeMB();
@@ -84,68 +99,43 @@ namespace mongo {
}
_block_cache = rocksdb::NewLRUCache(cacheSizeGB * 1024 * 1024 * 1024LL);
}
+ // open DB
+ rocksdb::DB* db;
+ auto s = rocksdb::DB::Open(_options(), path, &db);
+ ROCKS_STATUS_OK(s);
+ _db.reset(db);
- auto columnFamilyNames = _loadColumnFamilies(); // vector of column family names
- std::unordered_map<std::string, Ordering> orderings; // column family name -> Ordering
- std::set<std::string> collections; // set of collection names
-
- if (columnFamilyNames.empty()) { // new DB
- columnFamilyNames.push_back(rocksdb::kDefaultColumnFamilyName);
- } else { // existing DB
- // open DB in read-only mode to load metadata
- rocksdb::DB* dbReadOnly;
- auto s = rocksdb::DB::OpenForReadOnly(_dbOptions(), path, &dbReadOnly);
- ROCKS_STATUS_OK(s);
- auto itr = dbReadOnly->NewIterator(rocksdb::ReadOptions());
- orderings = _loadOrderingMetaData(itr);
- collections = _loadCollections(itr);
- delete itr;
- delete dbReadOnly;
- }
-
- std::vector<rocksdb::ColumnFamilyDescriptor> columnFamilies;
- std::set<std::string> toDropColumnFamily;
-
- for (const auto& cf : columnFamilyNames) {
- if (cf == rocksdb::kDefaultColumnFamilyName) {
- columnFamilies.emplace_back(cf, _defaultCFOptions());
- continue;
- }
- auto orderings_iter = orderings.find(cf);
- auto collections_iter = collections.find(cf);
- bool isIndex = orderings_iter != orderings.end();
- bool isCollection = collections_iter != collections.end();
- invariant(!isIndex || !isCollection);
- if (isIndex) {
- columnFamilies.emplace_back(cf, _indexOptions(orderings_iter->second));
- } else if (isCollection) {
- columnFamilies.emplace_back(cf, _collectionOptions());
- } else {
- // TODO support this from inside of rocksdb, by using
- // Options::drop_unopened_column_families.
- // This can happen because write and createColumnFamily are not atomic
- toDropColumnFamily.insert(cf);
- columnFamilies.emplace_back(cf, _collectionOptions());
- }
+ // open iterator
+ boost::scoped_ptr<rocksdb::Iterator> _iter(_db->NewIterator(rocksdb::ReadOptions()));
+
+ // find maxPrefix
+ _maxPrefix = 0;
+ _iter->SeekToLast();
+ if (_iter->Valid()) {
+ // otherwise the DB is empty, so we just keep it at 0
+ bool ok = extractPrefix(_iter->key(), &_maxPrefix);
+ // this is DB corruption here
+ invariant(ok);
}
- std::vector<rocksdb::ColumnFamilyHandle*> handles;
- rocksdb::DB* db;
- auto s = rocksdb::DB::Open(_dbOptions(), path, columnFamilies, &handles, &db);
- ROCKS_STATUS_OK(s);
- invariant(handles.size() == columnFamilies.size());
- for (size_t i = 0; i < handles.size(); ++i) {
- if (toDropColumnFamily.find(columnFamilies[i].name) != toDropColumnFamily.end()) {
- db->DropColumnFamily(handles[i]);
- delete handles[i];
- } else if (columnFamilyNames[i] == rocksdb::kDefaultColumnFamilyName) {
- // we will not be needing this
- delete handles[i];
- } else {
- _identColumnFamilyMap[columnFamilies[i].name].reset(handles[i]);
+ // load ident to prefix map
+ {
+ boost::mutex::scoped_lock lk(_identPrefixMapMutex);
+ for (_iter->Seek(kMetadataPrefix);
+ _iter->Valid() && _iter->key().starts_with(kMetadataPrefix); _iter->Next()) {
+ rocksdb::Slice ident(_iter->key());
+ ident.remove_prefix(kMetadataPrefix.size());
+ // this could throw DBException, which then means DB corruption. We just let it fly
+ // to the caller
+ BSONObj identConfig(_iter->value().data());
+ BSONElement element = identConfig.getField("prefix");
+ // TODO: SERVER-16979 Correctly handle errors returned by RocksDB
+ // This is DB corruption
+ invariant(!element.eoo() || !element.isNumber());
+ uint32_t identPrefix = static_cast<uint32_t>(element.numberInt());
+ _identPrefixMap[StringData(ident.data(), ident.size())] = identPrefix;
}
}
- _db.reset(db);
}
RocksEngine::~RocksEngine() {}
@@ -154,197 +144,136 @@ namespace mongo {
return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _durable);
}
- Status RocksEngine::createRecordStore(OperationContext* opCtx,
- const StringData& ns,
+ Status RocksEngine::createRecordStore(OperationContext* opCtx, const StringData& ns,
const StringData& ident,
const CollectionOptions& options) {
- if (_existsColumnFamily(ident)) {
- return Status::OK();
- }
- _db->Put(rocksdb::WriteOptions(), kCollectionPrefix + ident.toString(), rocksdb::Slice());
- return _createColumnFamily(_collectionOptions(), ident);
+ return _createIdentPrefix(ident);
}
RecordStore* RocksEngine::getRecordStore(OperationContext* opCtx, const StringData& ns,
const StringData& ident,
const CollectionOptions& options) {
- auto columnFamily = _getColumnFamily(ident);
if (options.capped) {
return new RocksRecordStore(
- ns, ident, _db.get(), columnFamily, true,
+ ns, ident, _db.get(), _getIdentPrefix(ident), true,
options.cappedSize ? options.cappedSize : 4096, // default size
options.cappedMaxDocs ? options.cappedMaxDocs : -1);
} else {
- return new RocksRecordStore(ns, ident, _db.get(), columnFamily);
+ return new RocksRecordStore(ns, ident, _db.get(), _getIdentPrefix(ident));
}
}
Status RocksEngine::createSortedDataInterface(OperationContext* opCtx, const StringData& ident,
const IndexDescriptor* desc) {
- if (_existsColumnFamily(ident)) {
- return Status::OK();
- }
- auto keyPattern = desc->keyPattern();
-
- _db->Put(rocksdb::WriteOptions(), kOrderingPrefix + ident.toString(),
- rocksdb::Slice(keyPattern.objdata(), keyPattern.objsize()));
- return _createColumnFamily(_indexOptions(Ordering::make(keyPattern)), ident);
+ return _createIdentPrefix(ident);
}
SortedDataInterface* RocksEngine::getSortedDataInterface(OperationContext* opCtx,
const StringData& ident,
const IndexDescriptor* desc) {
if (desc->unique()) {
- return new RocksUniqueIndex(_db.get(), _getColumnFamily(ident), ident.toString(),
+ return new RocksUniqueIndex(_db.get(), _getIdentPrefix(ident), ident.toString(),
Ordering::make(desc->keyPattern()));
} else {
- return new RocksStandardIndex(_db.get(), _getColumnFamily(ident), ident.toString(),
+ return new RocksStandardIndex(_db.get(), _getIdentPrefix(ident), ident.toString(),
Ordering::make(desc->keyPattern()));
}
}
Status RocksEngine::dropIdent(OperationContext* opCtx, const StringData& ident) {
+ // TODO optimize this using CompactionFilterV2
rocksdb::WriteBatch wb;
- // TODO is there a more efficient way?
- wb.Delete(kOrderingPrefix + ident.toString());
- wb.Delete(kCollectionPrefix + ident.toString());
+ wb.Delete(kMetadataPrefix + ident.toString());
+
+ std::string prefix = _getIdentPrefix(ident);
+ rocksdb::Slice prefixSlice(prefix.data(), prefix.size());
+
+ boost::scoped_ptr<rocksdb::Iterator> _iter(_db->NewIterator(rocksdb::ReadOptions()));
+ for (_iter->Seek(prefixSlice); _iter->Valid() && _iter->key().starts_with(prefixSlice);
+ _iter->Next()) {
+ ROCKS_STATUS_OK(_iter->status());
+ wb.Delete(_iter->key());
+ }
auto s = _db->Write(rocksdb::WriteOptions(), &wb);
if (!s.ok()) {
- return toMongoStatus(s);
+ return toMongoStatus(s);
}
- return _dropColumnFamily(ident);
- }
- std::vector<std::string> RocksEngine::getAllIdents( OperationContext* opCtx ) const {
- std::vector<std::string> indents;
- for (auto& entry : _identColumnFamilyMap) {
- indents.push_back(entry.first);
+ {
+ boost::mutex::scoped_lock lk(_identPrefixMapMutex);
+ _identPrefixMap.erase(ident);
}
- return indents;
- }
- // non public api
-
- bool RocksEngine::_existsColumnFamily(const StringData& ident) {
- boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex);
- return _identColumnFamilyMap.find(ident) != _identColumnFamilyMap.end();
- }
-
- Status RocksEngine::_createColumnFamily(const rocksdb::ColumnFamilyOptions& options,
- const StringData& ident) {
- rocksdb::ColumnFamilyHandle* cf;
- auto s = _db->CreateColumnFamily(options, ident.toString(), &cf);
- if (!s.ok()) {
- return toMongoStatus(s);
- }
- boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex);
- _identColumnFamilyMap[ident].reset(cf);
return Status::OK();
}
- Status RocksEngine::_dropColumnFamily(const StringData& ident) {
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily;
- {
- boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex);
- auto cf_iter = _identColumnFamilyMap.find(ident);
- if (cf_iter == _identColumnFamilyMap.end()) {
- return Status(ErrorCodes::InternalError, "Not found");
- }
- columnFamily = cf_iter->second;
- _identColumnFamilyMap.erase(cf_iter);
- }
- auto s = _db->DropColumnFamily(columnFamily.get());
- return toMongoStatus(s);
+ bool RocksEngine::hasIdent(OperationContext* opCtx, const StringData& ident) const {
+ boost::mutex::scoped_lock lk(_identPrefixMapMutex);
+ return _identPrefixMap.find(ident) != _identPrefixMap.end();
}
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> RocksEngine::_getColumnFamily(
- const StringData& ident) {
- {
- boost::mutex::scoped_lock lk(_identColumnFamilyMapMutex);
- auto cf_iter = _identColumnFamilyMap.find(ident);
- invariant(cf_iter != _identColumnFamilyMap.end());
- return cf_iter->second;
+ std::vector<std::string> RocksEngine::getAllIdents(OperationContext* opCtx) const {
+ std::vector<std::string> indents;
+ for (auto& entry : _identPrefixMap) {
+ indents.push_back(entry.first);
}
+ return indents;
}
- std::unordered_map<std::string, Ordering> RocksEngine::_loadOrderingMetaData(
- rocksdb::Iterator* itr) {
- std::unordered_map<std::string, Ordering> orderings;
- for (itr->Seek(kOrderingPrefix); itr->Valid(); itr->Next()) {
- rocksdb::Slice key(itr->key());
- if (!key.starts_with(kOrderingPrefix)) {
- break;
+ // non public api
+ Status RocksEngine::_createIdentPrefix(const StringData& ident) {
+ uint32_t prefix = 0;
+ {
+ boost::mutex::scoped_lock lk(_identPrefixMapMutex);
+ if (_identPrefixMap.find(ident) != _identPrefixMap.end()) {
+ // already exists
+ return Status::OK();
}
- key.remove_prefix(kOrderingPrefix.size());
- std::string value(itr->value().ToString());
- orderings.insert({key.ToString(), Ordering::make(BSONObj(value.c_str()))});
- }
- ROCKS_STATUS_OK(itr->status());
- return orderings;
- }
- std::set<std::string> RocksEngine::_loadCollections(rocksdb::Iterator* itr) {
- std::set<std::string> collections;
- for (itr->Seek(kCollectionPrefix); itr->Valid() ; itr->Next()) {
- rocksdb::Slice key(itr->key());
- if (!key.starts_with(kCollectionPrefix)) {
- break;
- }
- key.remove_prefix(kCollectionPrefix.size());
- collections.insert(key.ToString());
+ prefix = ++_maxPrefix;
+ _identPrefixMap[ident] = prefix;
}
- ROCKS_STATUS_OK(itr->status());
- return collections;
- }
- std::vector<std::string> RocksEngine::_loadColumnFamilies() {
- std::vector<std::string> names;
- if (boost::filesystem::exists(_path)) {
- rocksdb::Status s = rocksdb::DB::ListColumnFamilies(_dbOptions(), _path, &names);
+ BSONObjBuilder builder;
+ builder.append("prefix", static_cast<int32_t>(prefix));
+ BSONObj config = builder.obj();
- if (s.IsIOError()) {
- // DNE, this means the directory exists but is empty, which is fine
- // because it means no rocks database exists yet
- } else {
- ROCKS_STATUS_OK(s);
- }
- }
+ auto s = _db->Put(rocksdb::WriteOptions(), kMetadataPrefix + ident.toString(),
+ rocksdb::Slice(config.objdata(), config.objsize()));
- return names;
+ return toMongoStatus(s);
}
- rocksdb::Options RocksEngine::_dbOptions() {
- rocksdb::Options options(rocksdb::DBOptions(), _defaultCFOptions());
-
- options.max_background_compactions = 4;
- options.max_background_flushes = 4;
-
- // create the DB if it's not already present
- options.create_if_missing = true;
- options.create_missing_column_families = true;
- options.wal_dir = _path + "/journal";
- options.max_total_wal_size = 1 << 30; // 1GB
-
- return options;
+ std::string RocksEngine::_getIdentPrefix(const StringData& ident) {
+ boost::mutex::scoped_lock lk(_identPrefixMapMutex);
+ auto prefixIter = _identPrefixMap.find(ident);
+ invariant(prefixIter != _identPrefixMap.end());
+ return encodePrefix(prefixIter->second);
}
- rocksdb::ColumnFamilyOptions RocksEngine::_defaultCFOptions() {
- // TODO pass or set appropriate options for default CF.
- rocksdb::ColumnFamilyOptions options;
+ rocksdb::Options RocksEngine::_options() const {
+ rocksdb::Options options;
rocksdb::BlockBasedTableOptions table_options;
table_options.block_cache = _block_cache;
table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10));
+ table_options.format_version = 2;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
+
+ options.write_buffer_size = 128 * 1024 * 1024; // 128MB
options.max_write_buffer_number = 4;
- return options;
- }
+ options.max_background_compactions = 8;
+ options.max_background_flushes = 4;
+ options.target_file_size_base = 64 * 1024 * 1024; // 64MB
+ options.soft_rate_limit = 2;
+ options.hard_rate_limit = 3;
+ options.max_bytes_for_level_base = 512 * 1024 * 1024; // 512 MB
+ options.max_open_files = 20000;
- rocksdb::ColumnFamilyOptions RocksEngine::_collectionOptions() {
- return _defaultCFOptions();
- }
+ // create the DB if it's not already present
+ options.create_if_missing = true;
+ options.wal_dir = _path + "/journal";
- rocksdb::ColumnFamilyOptions RocksEngine::_indexOptions(const Ordering& order) {
- return _defaultCFOptions();
+ return options;
}
Status toMongoStatus( rocksdb::Status s ) {
diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h
index 12ee7eaf355..4078c13414e 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.h
+++ b/src/mongo/db/storage/rocks/rocks_engine.h
@@ -89,9 +89,8 @@ namespace mongo {
virtual Status dropIdent(OperationContext* opCtx, const StringData& ident) override;
- virtual bool hasIdent(OperationContext* opCtx, const StringData& ident) const {
- return _identColumnFamilyMap.find(ident) != _identColumnFamilyMap.end();;
- }
+ virtual bool hasIdent(OperationContext* opCtx, const StringData& ident) const override;
+
virtual std::vector<std::string> getAllIdents( OperationContext* opCtx ) const override;
virtual bool supportsDocLocking() const override {
@@ -123,22 +122,10 @@ namespace mongo {
const rocksdb::DB* getDB() const { return _db.get(); }
private:
- bool _existsColumnFamily(const StringData& ident);
- Status _createColumnFamily(const rocksdb::ColumnFamilyOptions& options,
- const StringData& ident);
- Status _dropColumnFamily(const StringData& ident);
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> _getColumnFamily(const StringData& ident);
-
- std::unordered_map<std::string, Ordering> _loadOrderingMetaData(rocksdb::Iterator* itr);
- std::set<std::string> _loadCollections(rocksdb::Iterator* itr);
- std::vector<std::string> _loadColumnFamilies();
-
- rocksdb::ColumnFamilyOptions _collectionOptions();
- rocksdb::ColumnFamilyOptions _indexOptions(const Ordering& order);
-
- rocksdb::Options _dbOptions();
+ Status _createIdentPrefix(const StringData& ident);
+ std::string _getIdentPrefix(const StringData& ident);
- rocksdb::ColumnFamilyOptions _defaultCFOptions();
+ rocksdb::Options _options() const;
std::string _path;
boost::scoped_ptr<rocksdb::DB> _db;
@@ -146,18 +133,18 @@ namespace mongo {
const bool _durable;
- // Default column family is owned by the rocksdb::DB instance.
- rocksdb::ColumnFamilyHandle* _defaultHandle;
+ // ident prefix map stores mapping from ident to a prefix (uint32_t)
+ mutable boost::mutex _identPrefixMapMutex;
+ typedef StringMap<uint32_t> IdentPrefixMap;
+ IdentPrefixMap _identPrefixMap;
- mutable boost::mutex _identColumnFamilyMapMutex;
- typedef StringMap<boost::shared_ptr<rocksdb::ColumnFamilyHandle> > IdentColumnFamilyMap;
- IdentColumnFamilyMap _identColumnFamilyMap;
+ // protected by _identPrefixMapMutex
+ uint32_t _maxPrefix;
// This is for concurrency control
RocksTransactionEngine _transactionEngine;
- static const std::string kOrderingPrefix;
- static const std::string kCollectionPrefix;
+ static const std::string kMetadataPrefix;
};
Status toMongoStatus( rocksdb::Status s );
diff --git a/src/mongo/db/storage/rocks/rocks_index.cpp b/src/mongo/db/storage/rocks/rocks_index.cpp
index 2e7c0187efa..34068b44e48 100644
--- a/src/mongo/db/storage/rocks/rocks_index.cpp
+++ b/src/mongo/db/storage/rocks/rocks_index.cpp
@@ -100,16 +100,11 @@ namespace mongo {
*/
class RocksCursorBase : public SortedDataInterface::Cursor {
public:
- RocksCursorBase(OperationContext* txn, rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
+ RocksCursorBase(OperationContext* txn, rocksdb::DB* db, std::string prefix,
bool forward, Ordering order)
- : _db(db),
- _columnFamily(columnFamily),
- _forward(forward),
- _order(order),
- _isKeyCurrent(false) {
+ : _db(db), _prefix(prefix), _forward(forward), _order(order), _isKeyCurrent(false) {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
- _iterator.reset(ru->NewIterator(_columnFamily.get()));
+ _iterator.reset(ru->NewIterator(_prefix));
checkStatus();
}
@@ -197,13 +192,13 @@ namespace mongo {
if (!_savedEOF) {
loadKeyIfNeeded();
_savedRecordId = getRecordId();
- _iterator.reset();
}
+ _iterator.reset();
}
void restorePosition(OperationContext* txn) {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
- _iterator.reset(ru->NewIterator(_columnFamily.get()));
+ _iterator.reset(ru->NewIterator(_prefix));
if (!_savedEOF) {
_locate(_savedRecordId);
@@ -285,7 +280,7 @@ namespace mongo {
}
rocksdb::DB* _db; // not owned
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily;
+ std::string _prefix;
boost::scoped_ptr<rocksdb::Iterator> _iterator;
const bool _forward;
Ordering _order;
@@ -302,10 +297,9 @@ namespace mongo {
class RocksStandardCursor : public RocksCursorBase {
public:
- RocksStandardCursor(OperationContext* txn, rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
+ RocksStandardCursor(OperationContext* txn, rocksdb::DB* db, std::string prefix,
bool forward, Ordering order)
- : RocksCursorBase(txn, db, columnFamily, forward, order), _isTypeBitsValid(false) {}
+ : RocksCursorBase(txn, db, prefix, forward, order), _isTypeBitsValid(false) {}
virtual void invalidateCache() {
RocksCursorBase::invalidateCache();
@@ -371,10 +365,9 @@ namespace mongo {
class RocksUniqueCursor : public RocksCursorBase {
public:
- RocksUniqueCursor(OperationContext* txn, rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
+ RocksUniqueCursor(OperationContext* txn, rocksdb::DB* db, std::string prefix,
bool forward, Ordering order)
- : RocksCursorBase(txn, db, columnFamily, forward, order), _recordsIndex(0) {}
+ : RocksCursorBase(txn, db, prefix, forward, order), _recordsIndex(0) {}
virtual void invalidateCache() {
RocksCursorBase::invalidateCache();
@@ -504,10 +497,9 @@ namespace mongo {
/// RocksIndexBase
- RocksIndexBase::RocksIndexBase(rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf,
- std::string ident, Ordering order)
- : _db(db), _columnFamily(cf), _ident(std::move(ident)), _order(order) {}
+ RocksIndexBase::RocksIndexBase(rocksdb::DB* db, std::string prefix, std::string ident,
+ Ordering order)
+ : _db(db), _prefix(prefix), _ident(std::move(ident)), _order(order) {}
SortedDataBuilderInterface* RocksIndexBase::getBulkBuilder(OperationContext* txn,
bool dupsAllowed) {
@@ -529,7 +521,7 @@ namespace mongo {
bool RocksIndexBase::isEmpty(OperationContext* txn) {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
- boost::scoped_ptr<rocksdb::Iterator> it(ru->NewIterator(_columnFamily.get()));
+ boost::scoped_ptr<rocksdb::Iterator> it(ru->NewIterator(_prefix));
it->SeekToFirst();
return !it->Valid();
@@ -541,32 +533,32 @@ namespace mongo {
}
long long RocksIndexBase::getSpaceUsedBytes(OperationContext* txn) const {
- // TODO provide GetLiveFilesMetadata() with column family
- std::vector<rocksdb::LiveFileMetaData> metadata;
- _db->GetLiveFilesMetaData(&metadata);
- uint64_t spaceUsedBytes = 0;
- for (const auto& m : metadata) {
- if (m.column_family_name == _ident) {
- spaceUsedBytes += m.size;
+ uint64_t storageSize;
+ std::string nextPrefix(_prefix);
+ // first next lexicographically (assume same size)
+ for (int i = nextPrefix.size() - 1; i >= 0; --i) {
+ nextPrefix[i]++;
+ if (nextPrefix[i] != 0) {
+ break;
}
}
-
- uint64_t walSpaceUsed = 0;
- _db->GetIntProperty(_columnFamily.get(), "rocksdb.cur-size-all-mem-tables", &walSpaceUsed);
- return spaceUsedBytes + walSpaceUsed;
+ rocksdb::Range wholeRange(_prefix, nextPrefix);
+ _db->GetApproximateSizes(&wholeRange, 1, &storageSize);
+ return static_cast<long long>(storageSize);
}
- std::string RocksIndexBase::_getTransactionID(const KeyString& key) const {
- // TODO optimize in the future
- return _ident + std::string(key.getBuffer(), key.getSize());
+ std::string RocksIndexBase::_makePrefixedKey(const std::string& prefix,
+ const KeyString& encodedKey) {
+ std::string key(prefix);
+ key.append(encodedKey.getBuffer(), encodedKey.getSize());
+ return key;
}
/// RocksUniqueIndex
- RocksUniqueIndex::RocksUniqueIndex(rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf,
- std::string ident, Ordering order)
- : RocksIndexBase(db, cf, ident, order) {}
+ RocksUniqueIndex::RocksUniqueIndex(rocksdb::DB* db, std::string prefix, std::string ident,
+ Ordering order)
+ : RocksIndexBase(db, prefix, ident, order) {}
Status RocksUniqueIndex::insert(OperationContext* txn, const BSONObj& key, const RecordId& loc,
bool dupsAllowed) {
@@ -575,17 +567,16 @@ namespace mongo {
return s;
}
- auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
-
KeyString encodedKey(key, _order);
- rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize());
+ std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey));
- if (!ru->transaction()->registerWrite(_getTransactionID(encodedKey))) {
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ if (!ru->transaction()->registerWrite(prefixedKey)) {
throw WriteConflictException();
}
std::string currentValue;
- auto getStatus = ru->Get(_columnFamily.get(), keySlice, &currentValue);
+ auto getStatus = ru->Get(prefixedKey, &currentValue);
if (!getStatus.ok() && !getStatus.IsNotFound()) {
// This means that Get() returned an error
// TODO: SERVER-16979 Correctly handle errors returned by RocksDB
@@ -597,7 +588,7 @@ namespace mongo {
value.appendTypeBits(encodedKey.getTypeBits());
}
rocksdb::Slice valueSlice(value.getBuffer(), value.getSize());
- ru->writeBatch()->Put(_columnFamily.get(), keySlice, valueSlice);
+ ru->writeBatch()->Put(prefixedKey, valueSlice);
return Status::OK();
}
@@ -637,29 +628,29 @@ namespace mongo {
}
rocksdb::Slice valueVectorSlice(valueVector.getBuffer(), valueVector.getSize());
- ru->writeBatch()->Put(_columnFamily.get(), keySlice, valueVectorSlice);
+ ru->writeBatch()->Put(prefixedKey, valueVectorSlice);
return Status::OK();
}
void RocksUniqueIndex::unindex(OperationContext* txn, const BSONObj& key, const RecordId& loc,
bool dupsAllowed) {
KeyString encodedKey(key, _order);
- rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize());
- RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey));
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
// We can't let two threads unindex the same key
- if (!ru->transaction()->registerWrite(_getTransactionID(encodedKey))) {
+ if (!ru->transaction()->registerWrite(prefixedKey)) {
throw WriteConflictException();
}
if (!dupsAllowed) {
- ru->writeBatch()->Delete(_columnFamily.get(), keySlice);
+ ru->writeBatch()->Delete(prefixedKey);
return;
}
// dups are allowed, so we have to deal with a vector of RecordIds.
std::string currentValue;
- auto getStatus = ru->Get(_columnFamily.get(), keySlice, &currentValue);
+ auto getStatus = ru->Get(prefixedKey, &currentValue);
if (!getStatus.ok() && !getStatus.IsNotFound()) {
// This means that Get() returned an error
// TODO: SERVER-16979 Correctly handle errors returned by RocksDB
@@ -681,7 +672,7 @@ namespace mongo {
if (records.empty() && !br.remaining()) {
// This is the common case: we are removing the only loc for this key.
// Remove the whole entry.
- ru->writeBatch()->Delete(_columnFamily.get(), keySlice);
+ ru->writeBatch()->Delete(prefixedKey);
return;
}
@@ -710,23 +701,23 @@ namespace mongo {
}
rocksdb::Slice newValueSlice(newValue.getBuffer(), newValue.getSize());
- ru->writeBatch()->Put(_columnFamily.get(), keySlice, newValueSlice);
+ ru->writeBatch()->Put(prefixedKey, newValueSlice);
}
SortedDataInterface::Cursor* RocksUniqueIndex::newCursor(OperationContext* txn,
int direction) const {
invariant( ( direction == 1 || direction == -1 ) && "invalid value for direction" );
- return new RocksUniqueCursor(txn, _db, _columnFamily, direction == 1, _order);
+ return new RocksUniqueCursor(txn, _db, _prefix, direction == 1, _order);
}
Status RocksUniqueIndex::dupKeyCheck(OperationContext* txn, const BSONObj& key,
const RecordId& loc) {
KeyString encodedKey(key, _order);
- rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize());
+ std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey));
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
std::string value;
- auto getStatus = ru->Get(_columnFamily.get(), keySlice, &value);
+ auto getStatus = ru->Get(prefixedKey, &value);
if (!getStatus.ok() && !getStatus.IsNotFound()) {
// This means that Get() returned an error
// TODO: SERVER-16979 Correctly handle errors returned by RocksDB
@@ -750,10 +741,9 @@ namespace mongo {
}
/// RocksStandardIndex
- RocksStandardIndex::RocksStandardIndex(rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf,
- std::string ident, Ordering order)
- : RocksIndexBase(db, cf, ident, order) {}
+ RocksStandardIndex::RocksStandardIndex(rocksdb::DB* db, std::string prefix, std::string ident,
+ Ordering order)
+ : RocksIndexBase(db, prefix, ident, order) {}
Status RocksStandardIndex::insert(OperationContext* txn, const BSONObj& key,
const RecordId& loc, bool dupsAllowed) {
@@ -766,9 +756,7 @@ namespace mongo {
// If we're inserting an index element, this means we already "locked" the RecordId of the
// document. No need to register write here
KeyString encodedKey(key, _order, loc);
- rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize());
-
- auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey));
rocksdb::Slice value;
if (!encodedKey.getTypeBits().isAllZeros()) {
@@ -777,8 +765,8 @@ namespace mongo {
encodedKey.getTypeBits().getSize());
}
- ru->writeBatch()->Put(_columnFamily.get(),
- rocksdb::Slice(encodedKey.getBuffer(), encodedKey.getSize()), value);
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ ru->writeBatch()->Put(prefixedKey, value);
return Status::OK();
}
@@ -790,15 +778,15 @@ namespace mongo {
// document. No need to register write here
KeyString encodedKey(key, _order, loc);
- rocksdb::Slice keySlice(encodedKey.getBuffer(), encodedKey.getSize());
+ std::string prefixedKey(_makePrefixedKey(_prefix, encodedKey));
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
- ru->writeBatch()->Delete(_columnFamily.get(), keySlice);
+ ru->writeBatch()->Delete(prefixedKey);
}
SortedDataInterface::Cursor* RocksStandardIndex::newCursor(OperationContext* txn,
int direction) const {
invariant( ( direction == 1 || direction == -1 ) && "invalid value for direction" );
- return new RocksStandardCursor(txn, _db, _columnFamily, direction == 1, _order);
+ return new RocksStandardCursor(txn, _db, _prefix, direction == 1, _order);
}
diff --git a/src/mongo/db/storage/rocks/rocks_index.h b/src/mongo/db/storage/rocks/rocks_index.h
index 71ce3ddfd75..3c91185aac2 100644
--- a/src/mongo/db/storage/rocks/rocks_index.h
+++ b/src/mongo/db/storage/rocks/rocks_index.h
@@ -40,7 +40,6 @@
#pragma once
namespace rocksdb {
- class ColumnFamilyHandle;
class DB;
}
@@ -52,8 +51,7 @@ namespace mongo {
MONGO_DISALLOW_COPYING(RocksIndexBase);
public:
- RocksIndexBase(rocksdb::DB* db, boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf,
- std::string ident, Ordering order);
+ RocksIndexBase(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order);
virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed);
@@ -73,14 +71,12 @@ namespace mongo {
virtual long long getSpaceUsedBytes( OperationContext* txn ) const;
protected:
- std::string _getTransactionID(const KeyString& key) const;
+ static std::string _makePrefixedKey(const std::string& prefix, const KeyString& encodedKey);
rocksdb::DB* _db; // not owned
- // Each index is stored as a single column family, so this stores the handle to the
- // relevant column family
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily;
-
+ // Each key in the index is prefixed with _prefix
+ std::string _prefix;
std::string _ident;
// used to construct RocksCursors
@@ -89,8 +85,7 @@ namespace mongo {
class RocksUniqueIndex : public RocksIndexBase {
public:
- RocksUniqueIndex(rocksdb::DB* db, boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf,
- std::string ident, Ordering order);
+ RocksUniqueIndex(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order);
virtual Status insert(OperationContext* txn, const BSONObj& key, const RecordId& loc,
bool dupsAllowed);
@@ -103,8 +98,7 @@ namespace mongo {
class RocksStandardIndex : public RocksIndexBase {
public:
- RocksStandardIndex(rocksdb::DB* db, boost::shared_ptr<rocksdb::ColumnFamilyHandle> cf,
- std::string ident, Ordering order);
+ RocksStandardIndex(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order);
virtual Status insert(OperationContext* txn, const BSONObj& key, const RecordId& loc,
bool dupsAllowed);
diff --git a/src/mongo/db/storage/rocks/rocks_index_test.cpp b/src/mongo/db/storage/rocks/rocks_index_test.cpp
index 5b1c10b6aef..43effa007b1 100644
--- a/src/mongo/db/storage/rocks/rocks_index_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_index_test.cpp
@@ -58,26 +58,18 @@ namespace mongo {
RocksIndexHarness() : _order(Ordering::make(BSONObj())), _tempDir(_testNamespace) {
boost::filesystem::remove_all(_tempDir.path());
rocksdb::DB* db;
- std::vector<rocksdb::ColumnFamilyDescriptor> cfs;
- cfs.emplace_back();
- cfs.emplace_back("index", rocksdb::ColumnFamilyOptions());
- rocksdb::DBOptions db_options;
- db_options.create_if_missing = true;
- db_options.create_missing_column_families = true;
- std::vector<rocksdb::ColumnFamilyHandle*> handles;
- auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db);
+ rocksdb::Options options;
+ options.create_if_missing = true;
+ auto s = rocksdb::DB::Open(options, _tempDir.path(), &db);
ASSERT(s.ok());
_db.reset(db);
- _cf.reset(handles[1]);
}
virtual SortedDataInterface* newSortedDataInterface(bool unique) {
if (unique) {
- return new RocksUniqueIndex(_db.get(), _cf, rocksdb::kDefaultColumnFamilyName,
- _order);
+ return new RocksUniqueIndex(_db.get(), "prefix", "ident", _order);
} else {
- return new RocksStandardIndex(_db.get(), _cf, rocksdb::kDefaultColumnFamilyName,
- _order);
+ return new RocksStandardIndex(_db.get(), "prefix", "ident", _order);
}
}
@@ -90,7 +82,6 @@ namespace mongo {
string _testNamespace = "mongo-rocks-sorted-data-test";
unittest::TempDir _tempDir;
scoped_ptr<rocksdb::DB> _db;
- shared_ptr<rocksdb::ColumnFamilyHandle> _cf;
RocksTransactionEngine _transactionEngine;
};
diff --git a/src/mongo/db/storage/rocks/rocks_init.cpp b/src/mongo/db/storage/rocks/rocks_init.cpp
index 9442d5c30d5..ebf9e8930b3 100644
--- a/src/mongo/db/storage/rocks/rocks_init.cpp
+++ b/src/mongo/db/storage/rocks/rocks_init.cpp
@@ -99,7 +99,10 @@ namespace mongo {
// Current disk format. We bump this number when we change the disk format. MongoDB will
// fail to start if the versions don't match. In that case a user needs to run mongodump
// and mongorestore.
- const int kRocksFormatVersion = 1;
+ // * Version 1 was the format with many column families -- one column family for each
+ // collection and index
+ // * Version 2 (current) keeps all collections and indexes in a single column family
+ const int kRocksFormatVersion = 2;
const std::string kRocksFormatVersionString = "rocksFormatVersion";
};
} // namespace
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp
index a53658fab5a..0fc42f6d484 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp
@@ -137,12 +137,12 @@ namespace mongo {
RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id,
rocksdb::DB* db, // not owned here
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
- bool isCapped, int64_t cappedMaxSize, int64_t cappedMaxDocs,
+ std::string prefix, bool isCapped, int64_t cappedMaxSize,
+ int64_t cappedMaxDocs,
CappedDocumentDeleteCallback* cappedDeleteCallback)
: RecordStore(ns),
_db(db),
- _columnFamily(columnFamily),
+ _prefix(std::move(prefix)),
_isCapped(isCapped),
_cappedMaxSize(cappedMaxSize),
_cappedMaxDocs(cappedMaxDocs),
@@ -153,10 +153,8 @@ namespace mongo {
_cappedVisibilityManager((_isCapped || _isOplog) ? new CappedVisibilityManager()
: nullptr),
_ident(id.toString()),
- _dataSizeKey("datasize-" + id.toString()),
- _numRecordsKey("numrecords-" + id.toString()) {
- invariant( _db );
- invariant( _columnFamily );
+ _dataSizeKey(std::string("\0\0\0\0", 4) + "datasize-" + id.toString()),
+ _numRecordsKey(std::string("\0\0\0\0", 4) + "numrecords-" + id.toString()) {
if (_isCapped) {
invariant(_cappedMaxSize > 0);
@@ -169,7 +167,7 @@ namespace mongo {
// Get next id
boost::scoped_ptr<rocksdb::Iterator> iter(
- db->NewIterator(_readOptions(), columnFamily.get()));
+ RocksRecoveryUnit::NewIteratorNoSnapshot(_db, _prefix));
iter->SeekToLast();
if (iter->Valid()) {
rocksdb::Slice lastSlice = iter->key();
@@ -178,70 +176,43 @@ namespace mongo {
_cappedVisibilityManager->updateHighestSeen(lastId);
}
_nextIdNum.store(lastId.repr() + 1);
- }
- else {
+ } else {
// Need to start at 1 so we are always higher than RecordId::min()
- _nextIdNum.store( 1 );
+ _nextIdNum.store(1);
}
// load metadata
- std::string value;
- bool metadataPresent = true;
- // XXX not using a Snapshot here
- if (!_db->Get(_readOptions(), rocksdb::Slice(_numRecordsKey), &value).ok()) {
- _numRecords.store(0);
- metadataPresent = false;
- }
- else {
- long long numRecords = 0;
- memcpy( &numRecords, value.data(), sizeof(numRecords));
- _numRecords.store(numRecords);
- }
-
- // XXX not using a Snapshot here
- if (!_db->Get(_readOptions(), rocksdb::Slice(_dataSizeKey), &value).ok()) {
- _dataSize.store(0);
- invariant(!metadataPresent);
- }
- else {
- invariant(value.size() == sizeof(long long));
- long long ds;
- memcpy(&ds, value.data(), sizeof(long long));
- invariant(ds >= 0);
- _dataSize.store(ds);
- }
+ _numRecords.store(RocksRecoveryUnit::getCounterValue(_db, _numRecordsKey));
+ _dataSize.store(RocksRecoveryUnit::getCounterValue(_db, _dataSizeKey));
+ invariant(_dataSize.load() >= 0);
+ invariant(_numRecords.load() >= 0);
}
- int64_t RocksRecordStore::storageSize( OperationContext* txn,
- BSONObjBuilder* extraInfo,
- int infoLevel ) const {
- uint64_t storageSize;
- int64_t minLocStorage, maxLocStorage;
- rocksdb::Range wholeRange(_makeKey(RecordId(), &minLocStorage),
- _makeKey(RecordId::max(), &maxLocStorage));
- _db->GetApproximateSizes(_columnFamily.get(), &wholeRange, 1, &storageSize);
- return static_cast<int64_t>( storageSize );
+ int64_t RocksRecordStore::storageSize(OperationContext* txn, BSONObjBuilder* extraInfo,
+ int infoLevel) const {
+ // we're lying, but that's the best we can do for now
+ return _dataSize.load();
}
RecordData RocksRecordStore::dataFor(OperationContext* txn, const RecordId& loc) const {
- RecordData rd = _getDataFor(_db, _columnFamily.get(), txn, loc);
+ RecordData rd = _getDataFor(_db, _prefix, txn, loc);
massert(28605, "Didn't find RecordId in RocksRecordStore", (rd.data() != nullptr));
return rd;
}
void RocksRecordStore::deleteRecord( OperationContext* txn, const RecordId& dl ) {
- RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
- if (!ru->transaction()->registerWrite(_getTransactionID(dl))) {
+ std::string key(_makePrefixedKey(_prefix, dl));
+
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ if (!ru->transaction()->registerWrite(key)) {
throw WriteConflictException();
}
std::string oldValue;
- int64_t locStorage;
- rocksdb::Slice key = _makeKey(dl, &locStorage);
- ru->Get(_columnFamily.get(), key, &oldValue);
+ ru->Get(key, &oldValue);
int oldLength = oldValue.size();
- ru->writeBatch()->Delete(_columnFamily.get(), key);
+ ru->writeBatch()->Delete(key);
_changeNumRecords(txn, false);
_increaseDataSize(txn, -oldLength);
@@ -304,7 +275,7 @@ namespace mongo {
try {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
- boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_columnFamily.get()));
+ boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix));
iter->SeekToFirst();
while (cappedAndNeedDelete(dataSizeDelta, numRecordsDelta) && iter->Valid()) {
@@ -357,6 +328,7 @@ namespace mongo {
const char* data,
int len,
bool enforceQuota ) {
+
if ( _isCapped && len > _cappedMaxSize ) {
return StatusWith<RecordId>( ErrorCodes::BadValue,
"object to insert exceeds cappedMaxSize" );
@@ -379,15 +351,9 @@ namespace mongo {
loc = _nextId();
}
- // XXX it might be safe to remove this, since we just allocated new unique RecordId.
- // However, we need to check if any other transaction can start modifying this RecordId
- // before our transaction is committed
- if (!ru->transaction()->registerWrite(_getTransactionID(loc))) {
- throw WriteConflictException();
- }
- int64_t locStorage;
- ru->writeBatch()->Put(_columnFamily.get(), _makeKey(loc, &locStorage),
- rocksdb::Slice(data, len));
+ // No need to register the write here, since we just allocated a new RecordId so no other
+ // transaction can access this key before we commit
+ ru->writeBatch()->Put(_makePrefixedKey(_prefix, loc), rocksdb::Slice(data, len));
_changeNumRecords( txn, true );
_increaseDataSize( txn, len );
@@ -413,15 +379,15 @@ namespace mongo {
int len,
bool enforceQuota,
UpdateNotifier* notifier ) {
+ std::string key(_makePrefixedKey(_prefix, loc));
+
RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
- if (!ru->transaction()->registerWrite(_getTransactionID(loc))) {
+ if (!ru->transaction()->registerWrite(key)) {
throw WriteConflictException();
}
std::string old_value;
- int64_t locStorage;
- rocksdb::Slice key = _makeKey(loc, &locStorage);
- auto status = ru->Get(_columnFamily.get(), key, &old_value);
+ auto status = ru->Get(key, &old_value);
if ( !status.ok() ) {
return StatusWith<RecordId>( ErrorCodes::InternalError, status.ToString() );
@@ -429,7 +395,7 @@ namespace mongo {
int old_length = old_value.size();
- ru->writeBatch()->Put(_columnFamily.get(), key, rocksdb::Slice(data, len));
+ ru->writeBatch()->Put(key, rocksdb::Slice(data, len));
_increaseDataSize(txn, len - old_length);
@@ -439,7 +405,7 @@ namespace mongo {
}
bool RocksRecordStore::updateWithDamagesSupported() const {
- return true;
+ return false;
}
Status RocksRecordStore::updateWithDamages( OperationContext* txn,
@@ -447,39 +413,7 @@ namespace mongo {
const RecordData& oldRec,
const char* damageSource,
const mutablebson::DamageVector& damages ) {
- RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
- if (!ru->transaction()->registerWrite(_getTransactionID(loc))) {
- throw WriteConflictException();
- }
-
- int64_t locStorage;
- rocksdb::Slice key = _makeKey(loc, &locStorage);
-
- // get original value
- std::string value;
- rocksdb::Status status;
- status = ru->Get(_columnFamily.get(), key, &value);
-
- if ( !status.ok() ) {
- if ( status.IsNotFound() )
- return Status( ErrorCodes::InternalError, "doc not found for in-place update" );
-
- log() << "rocks Get failed, blowing up: " << status.ToString();
- invariant( false );
- }
-
- // apply changes to our copy
- for( size_t i = 0; i < damages.size(); i++ ) {
- mutablebson::DamageEvent event = damages[i];
- const char* sourcePtr = damageSource + event.sourceOffset;
-
- invariant( event.targetOffset + event.size < value.length() );
- value.replace( event.targetOffset, event.size, sourcePtr, event.size );
- }
-
- // write back
- ru->writeBatch()->Put(_columnFamily.get(), key, value);
-
+ invariant(false);
return Status::OK();
}
@@ -494,18 +428,17 @@ namespace mongo {
}
}
- return new Iterator(txn, _db, _columnFamily, _cappedVisibilityManager, dir, start);
+ return new Iterator(txn, _db, _prefix, _cappedVisibilityManager, dir, start);
}
std::vector<RecordIterator*> RocksRecordStore::getManyIterators(OperationContext* txn) const {
- return {new Iterator(txn, _db, _columnFamily, _cappedVisibilityManager,
+ return {new Iterator(txn, _db, _prefix, _cappedVisibilityManager,
CollectionScanParams::FORWARD, RecordId())};
}
Status RocksRecordStore::truncate( OperationContext* txn ) {
// XXX once we have readable WriteBatch, also delete outstanding writes to
// this collection in the WriteBatch
- //AFB add Clear(ColumnFamilyHandle*)
boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) );
while( !iter->isEOF() ) {
RecordId loc = iter->getNext();
@@ -519,7 +452,11 @@ namespace mongo {
RecordStoreCompactAdaptor* adaptor,
const CompactOptions* options,
CompactStats* stats ) {
- rocksdb::Status status = _db->CompactRange(_columnFamily.get(), NULL, NULL);
+ std::string beginString(_makePrefixedKey(_prefix, RecordId()));
+ std::string endString(_makePrefixedKey(_prefix, RecordId::max()));
+ rocksdb::Slice beginRange(beginString);
+ rocksdb::Slice endRange(endString);
+ rocksdb::Status status = _db->CompactRange(&beginRange, &endRange);
if ( status.ok() )
return Status::OK();
else
@@ -571,7 +508,7 @@ namespace mongo {
result->appendIntOrLL("max", _cappedMaxDocs);
result->appendIntOrLL("maxSize", _cappedMaxSize / scale);
}
- bool valid = _db->GetProperty(_columnFamily.get(), "rocksdb.stats", &statsString);
+ bool valid = _db->GetProperty("rocksdb.stats", &statsString);
invariant( valid );
result->append( "stats", statsString );
}
@@ -611,9 +548,9 @@ namespace mongo {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
ru->setOplogReadTill(_cappedVisibilityManager->oplogStartHack());
- boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_columnFamily.get()));
- int64_t locStorage;
- iter->Seek(_makeKey(startingPosition, &locStorage));
+ boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix));
+ int64_t storage;
+ iter->Seek(_makeKey(startingPosition, &storage));
if (!iter->Valid()) {
iter->SeekToLast();
if (iter->Valid()) {
@@ -678,6 +615,14 @@ namespace mongo {
return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage));
}
+ std::string RocksRecordStore::_makePrefixedKey(const std::string& prefix, const RecordId& loc) {
+ int64_t storage;
+ auto encodedLoc = _makeKey(loc, &storage);
+ std::string key(prefix);
+ key.append(encodedLoc.data(), encodedLoc.size());
+ return key;
+ }
+
RecordId RocksRecordStore::_makeRecordId(const rocksdb::Slice& slice) {
invariant(slice.size() == sizeof(int64_t));
int64_t repr = endian::bigToNative(*reinterpret_cast<const int64_t*>(slice.data()));
@@ -687,20 +632,19 @@ namespace mongo {
bool RocksRecordStore::findRecord( OperationContext* txn,
const RecordId& loc, RecordData* out ) const {
- RecordData rd = _getDataFor(_db, _columnFamily.get(), txn, loc);
+ RecordData rd = _getDataFor(_db, _prefix, txn, loc);
if ( rd.data() == NULL )
return false;
*out = rd;
return true;
}
- RecordData RocksRecordStore::_getDataFor(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf,
+ RecordData RocksRecordStore::_getDataFor(rocksdb::DB* db, const std::string& prefix,
OperationContext* txn, const RecordId& loc) {
RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
std::string valueStorage;
- int64_t locStorage;
- auto status = ru->Get(cf, _makeKey(loc, &locStorage), &valueStorage);
+ auto status = ru->Get(_makePrefixedKey(prefix, loc), &valueStorage);
if (!status.ok()) {
if (status.IsNotFound()) {
return RecordData(nullptr, 0);
@@ -715,7 +659,6 @@ namespace mongo {
return RecordData(data.moveFrom(), valueStorage.size());
}
- // XXX make sure these work with rollbacks (I don't think they will)
void RocksRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) {
RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
if ( insert ) {
@@ -731,26 +674,20 @@ namespace mongo {
ru->incrementCounter(_dataSizeKey, &_dataSize, amount);
}
- std::string RocksRecordStore::_getTransactionID(const RecordId& rid) const {
- // TODO -- optimize in the future
- return _ident + std::string(reinterpret_cast<const char*>(&rid), sizeof(rid));
- }
-
// --------
RocksRecordStore::Iterator::Iterator(
- OperationContext* txn, rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
+ OperationContext* txn, rocksdb::DB* db, std::string prefix,
boost::shared_ptr<CappedVisibilityManager> cappedVisibilityManager,
const CollectionScanParams::Direction& dir, const RecordId& start)
: _txn(txn),
_db(db),
- _cf(columnFamily),
+ _prefix(std::move(prefix)),
_cappedVisibilityManager(cappedVisibilityManager),
_dir(dir),
_eof(true),
_readUntilForOplog(RocksRecoveryUnit::getRocksRecoveryUnit(txn)->getOplogReadTill()),
- _iterator(RocksRecoveryUnit::getRocksRecoveryUnit(txn)->NewIterator(_cf.get())) {
+ _iterator(RocksRecoveryUnit::getRocksRecoveryUnit(txn)->NewIterator(_prefix)) {
_locate(start);
}
@@ -828,7 +765,7 @@ namespace mongo {
}
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
- _iterator.reset(ru->NewIterator(_cf.get()));
+ _iterator.reset(ru->NewIterator(_prefix));
RecordId saved = _lastLoc;
_locate(_lastLoc);
@@ -862,7 +799,7 @@ namespace mongo {
memcpy(data.get(), _iterator->value().data(), _iterator->value().size());
return RecordData(data.moveFrom(), _iterator->value().size());
}
- return RocksRecordStore::_getDataFor(_db, _cf.get(), _txn, loc);
+ return RocksRecordStore::_getDataFor(_db, _prefix, _txn, loc);
}
void RocksRecordStore::Iterator::_locate(const RecordId& loc) {
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h
index 3cc1a607c1d..fdd2763a92f 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.h
+++ b/src/mongo/db/storage/rocks/rocks_record_store.h
@@ -44,7 +44,6 @@
#include "mongo/platform/atomic_word.h"
namespace rocksdb {
- class ColumnFamilyHandle;
class DB;
class Iterator;
class Slice;
@@ -79,8 +78,7 @@ namespace mongo {
class RocksRecordStore : public RecordStore {
public:
- RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
+ RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, std::string prefix,
bool isCapped = false, int64_t cappedMaxSize = -1,
int64_t cappedMaxDocs = -1,
CappedDocumentDeleteCallback* cappedDeleteCallback = NULL);
@@ -193,8 +191,7 @@ namespace mongo {
// shared_ptrs
class Iterator : public RecordIterator {
public:
- Iterator(OperationContext* txn, rocksdb::DB* db,
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> columnFamily,
+ Iterator(OperationContext* txn, rocksdb::DB* db, std::string prefix,
boost::shared_ptr<CappedVisibilityManager> cappedVisibilityManager,
const CollectionScanParams::Direction& dir, const RecordId& start);
@@ -214,7 +211,7 @@ namespace mongo {
OperationContext* _txn;
rocksdb::DB* _db; // not owned
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> _cf;
+ std::string _prefix;
boost::shared_ptr<CappedVisibilityManager> _cappedVisibilityManager;
CollectionScanParams::Direction _dir;
bool _eof;
@@ -232,7 +229,7 @@ namespace mongo {
static RecordId _makeRecordId( const rocksdb::Slice& slice );
- static RecordData _getDataFor(rocksdb::DB* db, rocksdb::ColumnFamilyHandle* cf,
+ static RecordData _getDataFor(rocksdb::DB* db, const std::string& prefix,
OperationContext* txn, const RecordId& loc);
RecordId _nextId();
@@ -241,14 +238,13 @@ namespace mongo {
// The use of this function requires that the passed in storage outlives the returned Slice
static rocksdb::Slice _makeKey(const RecordId& loc, int64_t* storage);
+ static std::string _makePrefixedKey(const std::string& prefix, const RecordId& loc);
void _changeNumRecords(OperationContext* txn, bool insert);
void _increaseDataSize(OperationContext* txn, int amount);
- std::string _getTransactionID(const RecordId& rid) const;
-
rocksdb::DB* _db; // not owned
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> _columnFamily;
+ std::string _prefix;
const bool _isCapped;
const int64_t _cappedMaxSize;
diff --git a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
index 1a52b18a9b2..b6ebcc7dfc4 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
@@ -58,30 +58,23 @@ namespace mongo {
RocksRecordStoreHarnessHelper() : _tempDir(_testNamespace) {
boost::filesystem::remove_all(_tempDir.path());
rocksdb::DB* db;
- std::vector<rocksdb::ColumnFamilyDescriptor> cfs;
- cfs.emplace_back();
- cfs.emplace_back("record_store", rocksdb::ColumnFamilyOptions());
- rocksdb::DBOptions db_options;
- db_options.create_if_missing = true;
- db_options.create_missing_column_families = true;
- std::vector<rocksdb::ColumnFamilyHandle*> handles;
- auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db);
+ rocksdb::Options options;
+ options.create_if_missing = true;
+ auto s = rocksdb::DB::Open(options, _tempDir.path(), &db);
ASSERT(s.ok());
_db.reset(db);
- delete handles[0];
- _cf.reset(handles[1]);
}
virtual RecordStore* newNonCappedRecordStore() {
return newNonCappedRecordStore("foo.bar");
}
RecordStore* newNonCappedRecordStore(const std::string& ns) {
- return new RocksRecordStore(ns, "1", _db.get(), _cf);
+ return new RocksRecordStore(ns, "1", _db.get(), "prefix");
}
RecordStore* newCappedRecordStore(const std::string& ns, int64_t cappedMaxSize,
int64_t cappedMaxDocs) {
- return new RocksRecordStore(ns, "1", _db.get(), _cf, true, cappedMaxSize,
+ return new RocksRecordStore(ns, "1", _db.get(), "prefix", true, cappedMaxSize,
cappedMaxDocs);
}
@@ -93,7 +86,6 @@ namespace mongo {
string _testNamespace = "mongo-rocks-record-store-test";
unittest::TempDir _tempDir;
boost::scoped_ptr<rocksdb::DB> _db;
- boost::shared_ptr<rocksdb::ColumnFamilyHandle> _cf;
RocksTransactionEngine _transactionEngine;
};
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
index 05237f5941e..4217573c7e4 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
@@ -29,6 +29,7 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
#include "mongo/platform/basic.h"
+#include "mongo/platform/endian.h"
#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
@@ -47,6 +48,66 @@
#include "mongo/util/log.h"
namespace mongo {
+ namespace {
+ class PrefixStrippingIterator : public rocksdb::Iterator {
+ public:
+ // baseIterator is consumed
+ PrefixStrippingIterator(std::string prefix, Iterator* baseIterator)
+ : _prefix(std::move(prefix)),
+ _prefixSlice(_prefix.data(), _prefix.size()),
+ _baseIterator(baseIterator) {}
+
+ virtual bool Valid() const {
+ return _baseIterator->Valid() && _baseIterator->key().starts_with(_prefixSlice);
+ }
+
+ virtual void SeekToFirst() { _baseIterator->Seek(_prefixSlice); }
+ virtual void SeekToLast() {
+ // next prefix lexicographically, assume same length
+ std::string nextPrefix(_prefix);
+ invariant(nextPrefix.size() > 0);
+ for (size_t i = nextPrefix.size() - 1; i >= 0; ++i) {
+ nextPrefix[i]++;
+ // if it's == 0, that means we've overflowed, so need to keep adding
+ if (nextPrefix[i] != 0) {
+ break;
+ }
+ }
+
+ _baseIterator->Seek(nextPrefix);
+ if (!_baseIterator->Valid()) {
+ _baseIterator->SeekToLast();
+ }
+ if (_baseIterator->Valid() && !_baseIterator->key().starts_with(_prefixSlice)) {
+ _baseIterator->Prev();
+ }
+ }
+
+ virtual void Seek(const rocksdb::Slice& target) {
+ std::unique_ptr<char[]> buffer(new char[_prefix.size() + target.size()]);
+ memcpy(buffer.get(), _prefix.data(), _prefix.size());
+ memcpy(buffer.get() + _prefix.size(), target.data(), target.size());
+ _baseIterator->Seek(rocksdb::Slice(buffer.get(), _prefix.size() + target.size()));
+ }
+
+ virtual void Next() { _baseIterator->Next(); }
+ virtual void Prev() { _baseIterator->Prev(); }
+
+ virtual rocksdb::Slice key() const {
+ rocksdb::Slice strippedKey = _baseIterator->key();
+ strippedKey.remove_prefix(_prefix.size());
+ return strippedKey;
+ }
+ virtual rocksdb::Slice value() const { return _baseIterator->value(); }
+ virtual rocksdb::Status status() const { return _baseIterator->status(); }
+
+ private:
+ std::string _prefix;
+ rocksdb::Slice _prefixSlice;
+ std::unique_ptr<Iterator> _baseIterator;
+ };
+
+ } // anonymous namespace
RocksRecoveryUnit::RocksRecoveryUnit(RocksTransactionEngine* transactionEngine, rocksdb::DB* db,
bool durable)
@@ -134,10 +195,9 @@ namespace mongo {
auto& counter = pair.second;
counter._value->fetch_add(counter._delta, std::memory_order::memory_order_relaxed);
long long newValue = counter._value->load(std::memory_order::memory_order_relaxed);
-
- // TODO: make the encoding platform indepdent.
- const char* nr_ptr = reinterpret_cast<char*>(&newValue);
- writeBatch()->Put(pair.first, rocksdb::Slice(nr_ptr, sizeof(long long)));
+ int64_t littleEndian = static_cast<int64_t>(endian::littleToNative(newValue));
+ const char* nr_ptr = reinterpret_cast<const char*>(&littleEndian);
+ writeBatch()->Put(pair.first, rocksdb::Slice(nr_ptr, sizeof(littleEndian)));
}
if (_writeBatch->GetWriteBatch()->Count() != 0) {
@@ -181,11 +241,9 @@ namespace mongo {
return _snapshot;
}
- rocksdb::Status RocksRecoveryUnit::Get(rocksdb::ColumnFamilyHandle* columnFamily,
- const rocksdb::Slice& key, std::string* value) {
+ rocksdb::Status RocksRecoveryUnit::Get(const rocksdb::Slice& key, std::string* value) {
if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) {
- boost::scoped_ptr<rocksdb::WBWIIterator> wb_iterator(
- _writeBatch->NewIterator(columnFamily));
+ boost::scoped_ptr<rocksdb::WBWIIterator> wb_iterator(_writeBatch->NewIterator());
wb_iterator->Seek(key);
if (wb_iterator->Valid() && wb_iterator->Entry().key == key) {
const auto& entry = wb_iterator->Entry();
@@ -199,19 +257,23 @@ namespace mongo {
}
rocksdb::ReadOptions options;
options.snapshot = snapshot();
- return _db->Get(options, columnFamily, key, value);
+ return _db->Get(options, key, value);
}
- rocksdb::Iterator* RocksRecoveryUnit::NewIterator(rocksdb::ColumnFamilyHandle* columnFamily) {
- invariant(columnFamily != _db->DefaultColumnFamily());
-
+ rocksdb::Iterator* RocksRecoveryUnit::NewIterator(std::string prefix) {
rocksdb::ReadOptions options;
options.snapshot = snapshot();
- auto iterator = _db->NewIterator(options, columnFamily);
+ auto iterator = _db->NewIterator(options);
if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) {
- iterator = _writeBatch->NewIteratorWithBase(columnFamily, iterator);
+ iterator = _writeBatch->NewIteratorWithBase(iterator);
}
- return iterator;
+ return new PrefixStrippingIterator(std::move(prefix), iterator);
+ }
+
+ rocksdb::Iterator* RocksRecoveryUnit::NewIteratorNoSnapshot(rocksdb::DB* db,
+ std::string prefix) {
+ auto iterator = db->NewIterator(rocksdb::ReadOptions());
+ return new PrefixStrippingIterator(std::move(prefix), iterator);
}
void RocksRecoveryUnit::incrementCounter(const rocksdb::Slice& counterKey,
@@ -238,6 +300,23 @@ namespace mongo {
}
}
+ long long RocksRecoveryUnit::getCounterValue(rocksdb::DB* db, const rocksdb::Slice counterKey) {
+ std::string value;
+ auto s = db->Get(rocksdb::ReadOptions(), counterKey, &value);
+ if (s.IsNotFound()) {
+ return 0;
+ } else if (!s.ok()) {
+ log() << "Counter get failed " << s.ToString();
+ invariant(!"Counter get failed");
+ }
+
+ int64_t ret;
+ invariant(sizeof(ret) == value.size());
+ memcpy(&ret, value.data(), sizeof(ret));
+ // we store counters in little endian
+ return static_cast<long long>(endian::littleToNative(ret));
+ }
+
RocksRecoveryUnit* RocksRecoveryUnit::getRocksRecoveryUnit(OperationContext* opCtx) {
return checked_cast<RocksRecoveryUnit*>(opCtx->recoveryUnit());
}
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h
index e1f95e1b0a7..70217a3d681 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h
@@ -50,7 +50,6 @@ namespace rocksdb {
class WriteBatchWithIndex;
class Comparator;
class Status;
- class ColumnFamilyHandle;
class Slice;
class Iterator;
}
@@ -91,16 +90,19 @@ namespace mongo {
RocksTransaction* transaction() { return &_transaction; }
- rocksdb::Status Get(rocksdb::ColumnFamilyHandle* columnFamily, const rocksdb::Slice& key,
- std::string* value);
+ rocksdb::Status Get(const rocksdb::Slice& key, std::string* value);
- rocksdb::Iterator* NewIterator(rocksdb::ColumnFamilyHandle* columnFamily);
+ rocksdb::Iterator* NewIterator(std::string prefix);
+
+ static rocksdb::Iterator* NewIteratorNoSnapshot(rocksdb::DB* db, std::string prefix);
void incrementCounter(const rocksdb::Slice& counterKey,
std::atomic<long long>* counter, long long delta);
long long getDeltaCounter(const rocksdb::Slice& counterKey);
+ static long long getCounterValue(rocksdb::DB* db, const rocksdb::Slice counterKey);
+
void setOplogReadTill(const RecordId& loc);
RecordId getOplogReadTill() const { return _oplogReadTill; }