summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Canadi <icanadi@fb.com>2015-03-23 12:35:20 -0700
committerRamon Fernandez <ramon.fernandez@mongodb.com>2015-03-23 17:35:59 -0400
commit12e24d4e910150d2523a226ba5a62370dc974d44 (patch)
tree1f22d9f9b12fce1d3dde260c4706f4eb92845f4d
parent1b7ee0b6d045c5f99036507915f29684cee59b29 (diff)
downloadmongo-12e24d4e910150d2523a226ba5a62370dc974d44.tar.gz
SERVER-17706 Sync mongo-rocks with our recent changes
We've been developing these changes in a different repository. This patch containes bunch of improvements and fixes: * Implement updateStatsAfterRepair() * Return correct ident size in getIdentSize() function * Fast drop operation in Mongo+Rocks * We should abort RocksTransaction when we do _releaseSnapshot() * SERVER-16979 Correctly handle errors returned by RocksDB * Implement StandardBulkBuilder and UniqueBulkBuilder * Add new option to RocksDB -- rocksdbMaxWriteMBPerSec * Make oplog cleanup more efficient (also changes disk format) * Some configuration changes Signed-off-by: Ramon Fernandez <ramon.fernandez@mongodb.com>
-rw-r--r--src/mongo/db/storage/rocks/KNOWN_ISSUES.md1
-rw-r--r--src/mongo/db/storage/rocks/SConscript2
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.cpp248
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.h26
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine_test.cpp3
-rw-r--r--src/mongo/db/storage/rocks/rocks_global_options.cpp16
-rw-r--r--src/mongo/db/storage/rocks/rocks_global_options.h3
-rw-r--r--src/mongo/db/storage/rocks/rocks_index.cpp469
-rw-r--r--src/mongo/db/storage/rocks/rocks_index.h15
-rw-r--r--src/mongo/db/storage/rocks/rocks_index_test.cpp9
-rw-r--r--src/mongo/db/storage/rocks/rocks_init.cpp17
-rw-r--r--src/mongo/db/storage/rocks/rocks_options_init.cpp5
-rw-r--r--src/mongo/db/storage/rocks/rocks_parameters.cpp73
-rw-r--r--src/mongo/db/storage/rocks/rocks_parameters.h50
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.cpp244
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.h28
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store_mock.cpp2
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp17
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store_test.cpp8
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.cpp35
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.h7
-rw-r--r--src/mongo/db/storage/rocks/rocks_server_status.cpp14
-rw-r--r--src/mongo/db/storage/rocks/rocks_transaction.cpp13
-rw-r--r--src/mongo/db/storage/rocks/rocks_transaction.h11
-rw-r--r--src/mongo/db/storage/rocks/rocks_util.cpp49
-rw-r--r--src/mongo/db/storage/rocks/rocks_util.h23
26 files changed, 975 insertions, 413 deletions
diff --git a/src/mongo/db/storage/rocks/KNOWN_ISSUES.md b/src/mongo/db/storage/rocks/KNOWN_ISSUES.md
new file mode 100644
index 00000000000..d82b8dcb48d
--- /dev/null
+++ b/src/mongo/db/storage/rocks/KNOWN_ISSUES.md
@@ -0,0 +1 @@
+* Don't create indexes on oplog. We will not properly unindex documents that we remove from the beginning of the oplog.
diff --git a/src/mongo/db/storage/rocks/SConscript b/src/mongo/db/storage/rocks/SConscript
index 8c43d94c303..2ba0f3149d3 100644
--- a/src/mongo/db/storage/rocks/SConscript
+++ b/src/mongo/db/storage/rocks/SConscript
@@ -12,6 +12,7 @@ if has_option("rocksdb"):
'rocks_recovery_unit.cpp',
'rocks_index.cpp',
'rocks_transaction.cpp',
+ 'rocks_util.cpp',
],
LIBDEPS= [
'$BUILD_DIR/mongo/bson',
@@ -36,6 +37,7 @@ if has_option("rocksdb"):
source= [
'rocks_init.cpp',
'rocks_options_init.cpp',
+ 'rocks_parameters.cpp',
'rocks_record_store_mongod.cpp',
'rocks_server_status.cpp',
],
diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp
index 2636f67396b..e16447b99f0 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.cpp
+++ b/src/mongo/db/storage/rocks/rocks_engine.cpp
@@ -31,7 +31,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
+#include "rocks_engine.h"
#include <boost/filesystem/operations.hpp>
#include <boost/make_shared.hpp>
@@ -39,10 +39,12 @@
#include <boost/scoped_ptr.hpp>
#include <rocksdb/cache.h>
+#include <rocksdb/compaction_filter.h>
#include <rocksdb/comparator.h>
#include <rocksdb/db.h>
#include <rocksdb/slice.h>
#include <rocksdb/options.h>
+#include <rocksdb/rate_limiter.h>
#include <rocksdb/table.h>
#include <rocksdb/utilities/convenience.h>
#include <rocksdb/filter_policy.h>
@@ -50,19 +52,20 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/index/index_descriptor.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/storage/rocks/rocks_global_options.h"
-#include "mongo/db/storage/rocks/rocks_record_store.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
-#include "mongo/db/storage/rocks/rocks_index.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/platform/endian.h"
#include "mongo/util/log.h"
#include "mongo/util/processinfo.h"
-#define ROCKS_TRACE log()
+#include "rocks_global_options.h"
+#include "rocks_record_store.h"
+#include "rocks_recovery_unit.h"
+#include "rocks_index.h"
+#include "rocks_util.h"
-#define ROCKS_STATUS_OK( s ) if ( !( s ).ok() ) { error() << "rocks error: " << ( s ).ToString(); \
- invariant( false ); }
+#define ROCKS_TRACE log()
namespace mongo {
@@ -81,10 +84,70 @@ namespace mongo {
uint32_t bigEndianPrefix = endian::nativeToBig(prefix);
return std::string(reinterpret_cast<const char*>(&bigEndianPrefix), sizeof(uint32_t));
}
+
+ class PrefixDeletingCompactionFilter : public rocksdb::CompactionFilter {
+ public:
+ explicit PrefixDeletingCompactionFilter(std::unordered_set<uint32_t> droppedPrefixes)
+ : _droppedPrefixes(std::move(droppedPrefixes)),
+ _prefixCache(0),
+ _droppedCache(false) {}
+
+ // filter is not called from multiple threads simultaneously
+ virtual bool Filter(int level, const rocksdb::Slice& key,
+ const rocksdb::Slice& existing_value, std::string* new_value,
+ bool* value_changed) const {
+ uint32_t prefix = 0;
+ if (!extractPrefix(key, &prefix)) {
+ // this means there is a key in the database that's shorter than 4 bytes. this
+ // should never happen and this is a corruption. however, it's not compaction
+ // filter's job to report corruption, so we just silently continue
+ return false;
+ }
+ if (prefix == _prefixCache) {
+ return _droppedCache;
+ }
+ _prefixCache = prefix;
+ _droppedCache = _droppedPrefixes.find(prefix) != _droppedPrefixes.end();
+ return _droppedCache;
+ }
+
+ virtual const char* Name() const { return "PrefixDeletingCompactionFilter"; }
+
+ private:
+ std::unordered_set<uint32_t> _droppedPrefixes;
+ mutable uint32_t _prefixCache;
+ mutable bool _droppedCache;
+ };
+
+ class PrefixDeletingCompactionFilterFactory : public rocksdb::CompactionFilterFactory {
+ public:
+ explicit PrefixDeletingCompactionFilterFactory(const RocksEngine* engine) : _engine(engine) {}
+
+ virtual std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter(
+ const rocksdb::CompactionFilter::Context& context) override {
+ auto droppedPrefixes = _engine->getDroppedPrefixes();
+ if (droppedPrefixes.size() == 0) {
+ // no compaction filter needed
+ return std::unique_ptr<rocksdb::CompactionFilter>(nullptr);
+ } else {
+ return std::unique_ptr<rocksdb::CompactionFilter>(
+ new PrefixDeletingCompactionFilter(std::move(droppedPrefixes)));
+ }
+ }
+
+ virtual const char* Name() const override {
+ return "PrefixDeletingCompactionFilterFactory";
+ }
+
+ private:
+ const RocksEngine* _engine;
+ };
+
} // anonymous namespace
// first four bytes are the default prefix 0
const std::string RocksEngine::kMetadataPrefix("\0\0\0\0metadata-", 12);
+ const std::string RocksEngine::kDroppedPrefix("\0\0\0\0droppedprefix-", 18);
RocksEngine::RocksEngine(const std::string& path, bool durable)
: _path(path), _durable(durable) {
@@ -101,23 +164,26 @@ namespace mongo {
cacheSizeGB = 1;
}
}
- _block_cache = rocksdb::NewLRUCache(cacheSizeGB * 1024 * 1024 * 1024LL);
+ _block_cache = rocksdb::NewLRUCache(cacheSizeGB * 1024 * 1024 * 1024LL, 10);
}
+ _maxWriteMBPerSec = rocksGlobalOptions.maxWriteMBPerSec;
+ _rateLimiter.reset(
+ rocksdb::NewGenericRateLimiter(static_cast<int64_t>(_maxWriteMBPerSec) * 1024 * 1024));
// open DB
rocksdb::DB* db;
auto s = rocksdb::DB::Open(_options(), path, &db);
- ROCKS_STATUS_OK(s);
+ invariantRocksOK(s);
_db.reset(db);
// open iterator
- boost::scoped_ptr<rocksdb::Iterator> _iter(_db->NewIterator(rocksdb::ReadOptions()));
+ boost::scoped_ptr<rocksdb::Iterator> iter(_db->NewIterator(rocksdb::ReadOptions()));
// find maxPrefix
_maxPrefix = 0;
- _iter->SeekToLast();
- if (_iter->Valid()) {
+ iter->SeekToLast();
+ if (iter->Valid()) {
// otherwise the DB is empty, so we just keep it at 0
- bool ok = extractPrefix(_iter->key(), &_maxPrefix);
+ bool ok = extractPrefix(iter->key(), &_maxPrefix);
// this is DB corruption here
invariant(ok);
}
@@ -125,21 +191,59 @@ namespace mongo {
// load ident to prefix map
{
boost::mutex::scoped_lock lk(_identPrefixMapMutex);
- for (_iter->Seek(kMetadataPrefix);
- _iter->Valid() && _iter->key().starts_with(kMetadataPrefix); _iter->Next()) {
- rocksdb::Slice ident(_iter->key());
+ for (iter->Seek(kMetadataPrefix);
+ iter->Valid() && iter->key().starts_with(kMetadataPrefix); iter->Next()) {
+ invariantRocksOK(iter->status());
+ rocksdb::Slice ident(iter->key());
ident.remove_prefix(kMetadataPrefix.size());
// this could throw DBException, which then means DB corruption. We just let it fly
// to the caller
- BSONObj identConfig(_iter->value().data());
+ BSONObj identConfig(iter->value().data());
BSONElement element = identConfig.getField("prefix");
- // TODO: SERVER-16979 Correctly handle errors returned by RocksDB
- // This is DB corruption
- invariant(!element.eoo() || !element.isNumber());
+
+ if (element.eoo() || !element.isNumber()) {
+ log() << "Mongo metadata in RocksDB database is corrupted.";
+ invariant(false);
+ }
+
uint32_t identPrefix = static_cast<uint32_t>(element.numberInt());
_identPrefixMap[StringData(ident.data(), ident.size())] = identPrefix;
}
}
+
+ // load dropped prefixes
+ {
+ rocksdb::WriteBatch wb;
+ // we will use this iter to check if prefixes are still alive
+ boost::scoped_ptr<rocksdb::Iterator> prefixIter(
+ _db->NewIterator(rocksdb::ReadOptions()));
+ for (iter->Seek(kDroppedPrefix);
+ iter->Valid() && iter->key().starts_with(kDroppedPrefix); iter->Next()) {
+ invariantRocksOK(iter->status());
+ rocksdb::Slice prefix(iter->key());
+ prefix.remove_prefix(kDroppedPrefix.size());
+ prefixIter->Seek(prefix);
+ invariantRocksOK(iter->status());
+ if (prefixIter->Valid() && prefixIter->key().starts_with(prefix)) {
+ // prefix is still alive, let's instruct the compaction filter to clear it up
+ uint32_t int_prefix;
+ bool ok = extractPrefix(prefix, &int_prefix);
+ invariant(ok);
+ {
+ boost::mutex::scoped_lock lk(_droppedPrefixesMutex);
+ _droppedPrefixes.insert(int_prefix);
+ }
+ } else {
+ // prefix is no longer alive. let's remove the prefix from our dropped prefixes
+ // list
+ wb.Delete(iter->key());
+ }
+ }
+ if (wb.Count() > 0) {
+ auto s = _db->Write(rocksdb::WriteOptions(), &wb);
+ invariantRocksOK(s);
+ }
+ }
}
RocksEngine::~RocksEngine() {}
@@ -151,12 +255,24 @@ namespace mongo {
Status RocksEngine::createRecordStore(OperationContext* opCtx, const StringData& ns,
const StringData& ident,
const CollectionOptions& options) {
- return _createIdentPrefix(ident);
+ auto s = _createIdentPrefix(ident);
+ if (NamespaceString::oplog(ns)) {
+ _oplogIdent = ident.toString();
+ // oplog needs two prefixes, so we also reserve the next one
+ {
+ boost::mutex::scoped_lock lk(_identPrefixMapMutex);
+ ++_maxPrefix;
+ }
+ }
+ return s;
}
RecordStore* RocksEngine::getRecordStore(OperationContext* opCtx, const StringData& ns,
const StringData& ident,
const CollectionOptions& options) {
+ if (NamespaceString::oplog(ns)) {
+ _oplogIdent = ident.toString();
+ }
if (options.capped) {
return new RocksRecordStore(
ns, ident, _db.get(), _getIdentPrefix(ident), true,
@@ -184,30 +300,51 @@ namespace mongo {
}
}
+ // cannot be rolled back
Status RocksEngine::dropIdent(OperationContext* opCtx, const StringData& ident) {
- // TODO optimize this using CompactionFilterV2
rocksdb::WriteBatch wb;
wb.Delete(kMetadataPrefix + ident.toString());
- std::string prefix = _getIdentPrefix(ident);
- rocksdb::Slice prefixSlice(prefix.data(), prefix.size());
+ // calculate which prefixes we need to drop
+ std::vector<std::string> prefixesToDrop;
+ prefixesToDrop.push_back(_getIdentPrefix(ident));
+ if (_oplogIdent == ident.toString()) {
+ // if we're dropping oplog, we also need to drop keys from RocksOplogKeyTracker (they
+ // are stored at prefix+1)
+ prefixesToDrop.push_back(rocksGetNextPrefix(prefixesToDrop[0]));
+ }
- boost::scoped_ptr<rocksdb::Iterator> _iter(_db->NewIterator(rocksdb::ReadOptions()));
- for (_iter->Seek(prefixSlice); _iter->Valid() && _iter->key().starts_with(prefixSlice);
- _iter->Next()) {
- ROCKS_STATUS_OK(_iter->status());
- wb.Delete(_iter->key());
+ // We record the fact that we're deleting this prefix. That way we ensure that the prefix is
+ // always deleted
+ for (const auto& prefix : prefixesToDrop) {
+ wb.Put(kDroppedPrefix + prefix, "");
}
- auto s = _db->Write(rocksdb::WriteOptions(), &wb);
+
+ // we need to make sure this is on disk before starting to delete data in compactions
+ rocksdb::WriteOptions syncOptions;
+ syncOptions.sync = true;
+ auto s = _db->Write(syncOptions, &wb);
if (!s.ok()) {
- return toMongoStatus(s);
+ return rocksToMongoStatus(s);
}
+ // remove from map
{
boost::mutex::scoped_lock lk(_identPrefixMapMutex);
_identPrefixMap.erase(ident);
}
+ // instruct compaction filter to start deleting
+ {
+ boost::mutex::scoped_lock lk(_droppedPrefixesMutex);
+ for (const auto& prefix : prefixesToDrop) {
+ uint32_t int_prefix;
+ bool ok = extractPrefix(prefix, &int_prefix);
+ invariant(ok);
+ _droppedPrefixes.insert(int_prefix);
+ }
+ }
+
return Status::OK();
}
@@ -224,6 +361,27 @@ namespace mongo {
return indents;
}
+ int64_t RocksEngine::getIdentSize(OperationContext* opCtx, const StringData& ident) {
+ uint64_t storageSize;
+ std::string prefix = _getIdentPrefix(ident);
+ std::string nextPrefix = std::move(rocksGetNextPrefix(prefix));
+ rocksdb::Range wholeRange(prefix, nextPrefix);
+ _db->GetApproximateSizes(&wholeRange, 1, &storageSize);
+ return std::max(static_cast<int64_t>(storageSize), static_cast<int64_t>(1));
+ }
+
+ void RocksEngine::setMaxWriteMBPerSec(int maxWriteMBPerSec) {
+ _maxWriteMBPerSec = maxWriteMBPerSec;
+ _rateLimiter->SetBytesPerSecond(static_cast<int64_t>(_maxWriteMBPerSec) * 1024 * 1024);
+ }
+
+ std::unordered_set<uint32_t> RocksEngine::getDroppedPrefixes() const {
+ boost::mutex::scoped_lock lk(_droppedPrefixesMutex);
+ // this will copy the set. that way compaction filter has its own copy and doesn't need to
+ // worry about thread safety
+ return _droppedPrefixes;
+ }
+
// non public api
Status RocksEngine::_createIdentPrefix(const StringData& ident) {
uint32_t prefix = 0;
@@ -245,7 +403,7 @@ namespace mongo {
auto s = _db->Put(rocksdb::WriteOptions(), kMetadataPrefix + ident.toString(),
rocksdb::Slice(config.objdata(), config.objsize()));
- return toMongoStatus(s);
+ return rocksToMongoStatus(s);
}
std::string RocksEngine::_getIdentPrefix(const StringData& ident) {
@@ -258,21 +416,26 @@ namespace mongo {
rocksdb::Options RocksEngine::_options() const {
// default options
rocksdb::Options options;
+ options.rate_limiter = _rateLimiter;
rocksdb::BlockBasedTableOptions table_options;
table_options.block_cache = _block_cache;
- table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10));
+ table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false));
+ table_options.block_size = 16 * 1024; // 16KB
table_options.format_version = 2;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
- options.write_buffer_size = 128 * 1024 * 1024; // 128MB
+ options.write_buffer_size = 64 * 1024 * 1024; // 64MB
options.max_write_buffer_number = 4;
- options.max_background_compactions = 8;
- options.max_background_flushes = 4;
+ options.max_background_compactions = 4;
+ options.max_background_flushes = 2;
options.target_file_size_base = 64 * 1024 * 1024; // 64MB
- options.soft_rate_limit = 2;
+ options.soft_rate_limit = 2.5;
options.hard_rate_limit = 3;
options.max_bytes_for_level_base = 512 * 1024 * 1024; // 512 MB
- options.max_open_files = 20000;
+ // This means there is no limit on open files. Make sure to always set ulimit so that it can
+ // keep all RocksDB files opened.
+ options.max_open_files = -1;
+ options.compaction_filter_factory.reset(new PrefixDeletingCompactionFilterFactory(this));
if (rocksGlobalOptions.compression == "snappy") {
options.compression = rocksdb::kSnappyCompression;
@@ -296,11 +459,4 @@ namespace mongo {
return options;
}
-
- Status toMongoStatus( rocksdb::Status s ) {
- if ( s.ok() )
- return Status::OK();
- else
- return Status( ErrorCodes::InternalError, s.ToString() );
- }
}
diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h
index 89a6b9804a6..f1a4d2f58da 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.h
+++ b/src/mongo/db/storage/rocks/rocks_engine.h
@@ -33,6 +33,7 @@
#include <map>
#include <string>
#include <memory>
+#include <unordered_set>
#include <boost/optional.hpp>
#include <boost/scoped_ptr.hpp>
@@ -40,14 +41,16 @@
#include <boost/thread/mutex.hpp>
#include <rocksdb/cache.h>
+#include <rocksdb/rate_limiter.h>
#include <rocksdb/status.h>
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/ordering.h"
#include "mongo/db/storage/kv/kv_engine.h"
-#include "mongo/db/storage/rocks/rocks_transaction.h"
#include "mongo/util/string_map.h"
+#include "rocks_transaction.h"
+
namespace rocksdb {
class ColumnFamilyHandle;
struct ColumnFamilyDescriptor;
@@ -103,11 +106,7 @@ namespace mongo {
virtual bool isDurable() const override { return _durable; }
- virtual int64_t getIdentSize(OperationContext* opCtx,
- const StringData& ident) {
- // TODO: return correct size.
- return 1;
- }
+ virtual int64_t getIdentSize(OperationContext* opCtx, const StringData& ident);
virtual Status repairIdent(OperationContext* opCtx,
const StringData& ident) {
@@ -129,6 +128,12 @@ namespace mongo {
rocksdb::DB* getDB() { return _db.get(); }
const rocksdb::DB* getDB() const { return _db.get(); }
size_t getBlockCacheUsage() const { return _block_cache->GetUsage(); }
+ std::unordered_set<uint32_t> getDroppedPrefixes() const;
+
+ RocksTransactionEngine* getTransactionEngine() { return &_transactionEngine; }
+
+ int getMaxWriteMBPerSec() const { return _maxWriteMBPerSec; }
+ void setMaxWriteMBPerSec(int maxWriteMBPerSec);
private:
Status _createIdentPrefix(const StringData& ident);
@@ -139,6 +144,8 @@ namespace mongo {
std::string _path;
boost::scoped_ptr<rocksdb::DB> _db;
std::shared_ptr<rocksdb::Cache> _block_cache;
+ int _maxWriteMBPerSec;
+ std::shared_ptr<rocksdb::RateLimiter> _rateLimiter;
const bool _durable;
@@ -146,15 +153,20 @@ namespace mongo {
mutable boost::mutex _identPrefixMapMutex;
typedef StringMap<uint32_t> IdentPrefixMap;
IdentPrefixMap _identPrefixMap;
+ std::string _oplogIdent;
// protected by _identPrefixMapMutex
uint32_t _maxPrefix;
+ // set of all prefixes that are deleted. we delete them in the background thread
+ mutable boost::mutex _droppedPrefixesMutex;
+ std::unordered_set<uint32_t> _droppedPrefixes;
+
// This is for concurrency control
RocksTransactionEngine _transactionEngine;
static const std::string kMetadataPrefix;
+ static const std::string kDroppedPrefix;
};
- Status toMongoStatus( rocksdb::Status s );
}
diff --git a/src/mongo/db/storage/rocks/rocks_engine_test.cpp b/src/mongo/db/storage/rocks/rocks_engine_test.cpp
index 31f5322b9ab..9c8befc5890 100644
--- a/src/mongo/db/storage/rocks/rocks_engine_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_engine_test.cpp
@@ -38,9 +38,10 @@
#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/db/storage/kv/kv_engine_test_harness.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
#include "mongo/unittest/temp_dir.h"
+#include "rocks_engine.h"
+
namespace mongo {
class RocksEngineHarnessHelper : public KVHarnessHelper {
public:
diff --git a/src/mongo/db/storage/rocks/rocks_global_options.cpp b/src/mongo/db/storage/rocks/rocks_global_options.cpp
index fa84cc17f40..264b17805f7 100644
--- a/src/mongo/db/storage/rocks/rocks_global_options.cpp
+++ b/src/mongo/db/storage/rocks/rocks_global_options.cpp
@@ -31,10 +31,11 @@
#include "mongo/platform/basic.h"
#include "mongo/base/status.h"
-#include "mongo/db/storage/rocks/rocks_global_options.h"
#include "mongo/util/log.h"
#include "mongo/util/options_parser/constraints.h"
+#include "rocks_global_options.h"
+
namespace mongo {
RocksGlobalOptions rocksGlobalOptions;
@@ -52,6 +53,14 @@ namespace mongo {
"[none|snappy|zlib]")
.format("(:?none)|(:?snappy)|(:?zlib)", "(none/snappy/zlib)")
.setDefault(moe::Value(std::string("snappy")));
+ rocksOptions
+ .addOptionChaining(
+ "storage.rocksdb.maxWriteMBPerSec", "rocksdbMaxWriteMBPerSec", moe::Int,
+ "Maximum speed that RocksDB will write to storage. Reducing this can "
+ "help reduce read latency spikes during compactions. However, reducing this "
+ "below a certain point might slow down writes. Defaults to 1GB/sec")
+ .validRange(1, 1024)
+ .setDefault(moe::Value(1024));
rocksOptions.addOptionChaining("storage.rocksdb.configString", "rocksdbConfigString",
moe::String,
"RocksDB storage engine custom "
@@ -71,6 +80,11 @@ namespace mongo {
params["storage.rocksdb.compression"].as<std::string>();
log() << "Compression: " << rocksGlobalOptions.compression;
}
+ if (params.count("storage.rocksdb.maxWriteMBPerSec")) {
+ rocksGlobalOptions.maxWriteMBPerSec =
+ params["storage.rocksdb.maxWriteMBPerSec"].as<int>();
+ log() << "MaxWriteMBPerSec: " << rocksGlobalOptions.maxWriteMBPerSec;
+ }
if (params.count("storage.rocksdb.configString")) {
rocksGlobalOptions.configString =
params["storage.rocksdb.configString"].as<std::string>();
diff --git a/src/mongo/db/storage/rocks/rocks_global_options.h b/src/mongo/db/storage/rocks/rocks_global_options.h
index 9a9e1b28d1f..aa8d103df33 100644
--- a/src/mongo/db/storage/rocks/rocks_global_options.h
+++ b/src/mongo/db/storage/rocks/rocks_global_options.h
@@ -37,12 +37,13 @@ namespace mongo {
class RocksGlobalOptions {
public:
- RocksGlobalOptions() : cacheSizeGB(0) {}
+ RocksGlobalOptions() : cacheSizeGB(0), maxWriteMBPerSec(1024), compression("snappy") {}
Status add(moe::OptionSection* options);
Status store(const moe::Environment& params, const std::vector<std::string>& args);
size_t cacheSizeGB;
+ int maxWriteMBPerSec;
std::string compression;
std::string configString;
diff --git a/src/mongo/db/storage/rocks/rocks_index.cpp b/src/mongo/db/storage/rocks/rocks_index.cpp
index a9b40edbf9d..e08b6395c9c 100644
--- a/src/mongo/db/storage/rocks/rocks_index.cpp
+++ b/src/mongo/db/storage/rocks/rocks_index.cpp
@@ -30,7 +30,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/storage/rocks/rocks_index.h"
+#include "rocks_index.h"
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
@@ -47,13 +47,14 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/storage/index_entry_comparison.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/rocks/rocks_record_store.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
-#include "mongo/db/storage/rocks/rocks_util.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "rocks_engine.h"
+#include "rocks_record_store.h"
+#include "rocks_recovery_unit.h"
+#include "rocks_util.h"
+
namespace mongo {
using boost::scoped_ptr;
@@ -103,10 +104,15 @@ namespace mongo {
public:
RocksCursorBase(OperationContext* txn, rocksdb::DB* db, std::string prefix,
bool forward, Ordering order)
- : _db(db), _prefix(prefix), _forward(forward), _order(order), _isKeyCurrent(false) {
+ : _db(db),
+ _prefix(prefix),
+ _forward(forward),
+ _order(order),
+ _locateCacheValid(false) {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
_iterator.reset(ru->NewIterator(_prefix));
- checkStatus();
+ _currentSequenceNumber = ru->snapshot()->GetSequenceNumber();
+ invariantRocksOK(_iterator->status());
}
int getDirection() const { return _forward ? 1 : -1; }
@@ -131,16 +137,41 @@ namespace mongo {
// even if keys are equal, record IDs might be different (for unique indexes, since
// in non-unique indexes RecordID is already encoded in the key)
- return getRecordId() == other.getRecordId();
+ return _loc == other._loc;
+ }
+
+ virtual void advance() {
+ // Advance on a cursor at the end is a no-op
+ if (isEOF()) {
+ return;
+ }
+ advanceCursor();
+ updatePosition();
}
bool locate(const BSONObj& key, const RecordId& loc) {
const BSONObj finalKey = stripFieldNames(key);
- fillKey(finalKey, loc);
- bool result = _locate(loc);
+
+ if (_locateCacheValid == true && finalKey == _locateCacheKey &&
+ loc == _locateCacheRecordId) {
+ // exact same call to locate()
+ return _locateCacheResult;
+ }
+
+ fillQuery(finalKey, loc, &_query);
+ bool result = _locate(_query, loc);
+ updatePosition();
// An explicit search at the start of the range should always return false
if (loc == RecordId::min() || loc == RecordId::max()) {
- return false;
+ result = false;
+ }
+
+ {
+ // memoization
+ _locateCacheKey = finalKey.getOwned();
+ _locateCacheRecordId = loc;
+ _locateCacheResult = result;
+ _locateCacheValid = true;
}
return result;
}
@@ -160,8 +191,9 @@ namespace mongo {
keyEndInclusive,
getDirection() );
- fillKey(key, RecordId());
- _locate(RecordId());
+ fillQuery(key, RecordId(), &_query);
+ _locate(_query, RecordId());
+ updatePosition();
}
/**
@@ -178,65 +210,69 @@ namespace mongo {
}
BSONObj getKey() const {
- if (_isKeyCurrent && !_keyBson.isEmpty()) {
- return _keyBson;
+ if (isEOF()) {
+ return BSONObj();
+ }
+
+ if (!_keyBsonCache.isEmpty()) {
+ return _keyBsonCache;
}
- loadKeyIfNeeded();
- _keyBson =
- KeyString::toBson(_key.getBuffer(), _key.getSize(), _order, getTypeBits());
- return _keyBson;
+
+ _keyBsonCache =
+ KeyString::toBson(_key.getBuffer(), _key.getSize(), _order, _typeBits);
+
+ return _keyBsonCache;
}
+ RecordId getRecordId() const { return _loc; }
+
void savePosition() {
_savedEOF = isEOF();
-
- if (!_savedEOF) {
- loadKeyIfNeeded();
- _savedRecordId = getRecordId();
- }
- _iterator.reset();
}
void restorePosition(OperationContext* txn) {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
- _iterator.reset(ru->NewIterator(_prefix));
-
- if (!_savedEOF) {
- _locate(_savedRecordId);
+ if (_currentSequenceNumber != ru->snapshot()->GetSequenceNumber()) {
+ _iterator.reset(ru->NewIterator(_prefix));
+ invariantRocksOK(_iterator->status());
+ _currentSequenceNumber = ru->snapshot()->GetSequenceNumber();
+
+ if (!_savedEOF) {
+ _locate(_key, _loc);
+ updatePosition();
+ }
}
}
protected:
// Uses _key for the key. Implemented by unique and standard index
- virtual bool _locate(RecordId loc) = 0;
+ virtual bool _locate(const KeyString& query, RecordId loc) = 0;
+
+ virtual void fillQuery(const BSONObj& key, RecordId loc, KeyString* query) const = 0;
- // Must invalidateCache()
- virtual void fillKey(const BSONObj& key, RecordId loc) = 0;
+ // Called after _key has been filled in. Must not throw WriteConflictException.
+ virtual void updateLocAndTypeBits() = 0;
- virtual const KeyString::TypeBits& getTypeBits() const = 0;
void advanceCursor() {
- invalidateCache();
if (_forward) {
_iterator->Next();
} else {
_iterator->Prev();
}
- checkStatus();
+ invariantRocksOK(_iterator->status());
}
- // Seeks to _key. Returns true on exact match.
- bool seekCursor() {
- invalidateCache();
- const rocksdb::Slice keySlice(_key.getBuffer(), _key.getSize());
+ // Seeks to query. Returns true on exact match.
+ bool seekCursor(const KeyString& query) {
+ const rocksdb::Slice keySlice(query.getBuffer(), query.getSize());
_iterator->Seek(keySlice);
- checkStatus();
if (!_iterator->Valid()) {
if (!_forward) {
// this will give lower bound behavior for backwards
_iterator->SeekToLast();
- checkStatus();
}
+ invariantRocksOK(_iterator->status());
return false;
}
@@ -252,32 +288,26 @@ namespace mongo {
// were
// searching for.
_iterator->Prev();
+ invariantRocksOK(_iterator->status());
}
return false;
}
- void loadKeyIfNeeded() const {
- if (_isKeyCurrent) {
+ void updatePosition() {
+ if (isEOF()) {
+ _loc = RecordId();
return;
}
auto key = _iterator->key();
_key.resetFromBuffer(key.data(), key.size());
- _isKeyCurrent = true;
- }
+ _keyBsonCache = BSONObj(); // Invalidate cached BSONObj.
- virtual void invalidateCache() {
- _isKeyCurrent = false;
- _keyBson = BSONObj();
- }
+ _locateCacheValid = false; // Invalidate locate cache
+ _locateCacheKey = BSONObj(); // Invalidate locate cache
- void checkStatus() {
- if ( !_iterator->status().ok() ) {
- log() << _iterator->status().ToString();
- // TODO: SERVER-16979 Correctly handle errors returned by RocksDB
- invariant( false );
- }
+ updateLocAndTypeBits();
}
rocksdb::DB* _db; // not owned
@@ -289,26 +319,30 @@ namespace mongo {
// These are for storing savePosition/restorePosition state
bool _savedEOF;
RecordId _savedRecordId;
+ rocksdb::SequenceNumber _currentSequenceNumber;
+
+ KeyString _key;
+ KeyString::TypeBits _typeBits;
+ RecordId _loc;
+ mutable BSONObj _keyBsonCache; // if isEmpty, cache invalid and must be loaded from
+ // _key.
- // These are all lazily loaded caches.
- mutable BSONObj _keyBson; // if isEmpty, it is invalid and must be loaded from _key.
- mutable bool _isKeyCurrent; // true if _key matches where the cursor is pointing
- mutable KeyString _key;
+ KeyString _query;
+
+ // These are for caching repeated calls to locate()
+ bool _locateCacheValid;
+ BSONObj _locateCacheKey;
+ RecordId _locateCacheRecordId;
+ bool _locateCacheResult;
};
class RocksStandardCursor : public RocksCursorBase {
public:
RocksStandardCursor(OperationContext* txn, rocksdb::DB* db, std::string prefix,
bool forward, Ordering order)
- : RocksCursorBase(txn, db, prefix, forward, order), _isTypeBitsValid(false) {}
-
- virtual void invalidateCache() {
- RocksCursorBase::invalidateCache();
- _loc = RecordId();
- _isTypeBitsValid = false;
- }
+ : RocksCursorBase(txn, db, prefix, forward, order) {}
- virtual void fillKey(const BSONObj& key, RecordId loc) {
+ virtual void fillQuery(const BSONObj& key, RecordId loc, KeyString* query) const {
// Null cursors should start at the zero key to maintain search ordering in the
// collator.
// Reverse cursors should start on the last matching key.
@@ -316,185 +350,180 @@ namespace mongo {
loc = _forward ? RecordId::min() : RecordId::max();
}
- _key.resetToKey(key, _order, loc);
- invalidateCache();
- }
- virtual bool _locate(RecordId loc) {
- // loc already encoded in _key
- return seekCursor();
+ query->resetToKey(key, _order, loc);
}
- virtual RecordId getRecordId() const {
- if (isEOF()) {
- return RecordId();
- }
-
- if (_loc.isNull()) {
- loadKeyIfNeeded();
- _loc = KeyString::decodeRecordIdAtEnd(_key.getBuffer(), _key.getSize());
- }
-
- dassert(!_loc.isNull());
- return _loc;
- }
-
- virtual void advance() {
- // Advance on a cursor at the end is a no-op
- if (isEOF()) {
- return;
- }
- advanceCursor();
+ virtual bool _locate(const KeyString& query, RecordId loc) {
+ // loc already encoded in _key
+ return seekCursor(query);
}
- virtual const KeyString::TypeBits& getTypeBits() const {
- if (!_isTypeBitsValid) {
- auto value = _iterator->value();
- BufReader br(value.data(), value.size());
- _typeBits.resetFromBuffer(&br);
- _isTypeBitsValid = true;
- }
-
- return _typeBits;
+ virtual void updateLocAndTypeBits() {
+ _loc = KeyString::decodeRecordIdAtEnd(_key.getBuffer(), _key.getSize());
+ auto value = _iterator->value();
+ BufReader br(value.data(), value.size());
+ _typeBits.resetFromBuffer(&br);
}
-
- private:
- mutable RecordId _loc;
-
- mutable bool _isTypeBitsValid;
- mutable KeyString::TypeBits _typeBits;
};
class RocksUniqueCursor : public RocksCursorBase {
public:
RocksUniqueCursor(OperationContext* txn, rocksdb::DB* db, std::string prefix,
bool forward, Ordering order)
- : RocksCursorBase(txn, db, prefix, forward, order), _recordsIndex(0) {}
+ : RocksCursorBase(txn, db, prefix, forward, order) {}
- virtual void invalidateCache() {
- RocksCursorBase::invalidateCache();
- _records.clear();
+ virtual void fillQuery(const BSONObj& key, RecordId loc, KeyString* query) const {
+ query->resetToKey(key, _order); // loc doesn't go in _query for unique indexes
}
- virtual void fillKey(const BSONObj& key, RecordId loc) {
- invalidateCache();
- _key.resetToKey(key, _order); // loc doesn't go in _key for unique indexes
- }
-
- virtual bool _locate(RecordId loc) {
- if (!seekCursor()) {
+ virtual bool _locate(const KeyString& query, RecordId loc) {
+ if (!seekCursor(query)) {
// If didn't seek to exact key, start at beginning of wherever we ended up.
return false;
}
dassert(!isEOF());
- if (loc.isNull()) {
- // Null loc means means start and beginning or end of array as needed.
- // so nothing to do
- return true;
- }
+ // If we get here we need to look at the actual RecordId for this key and make sure
+ // we are supposed to see it.
- // If we get here we need to make sure we are positioned at the correct point of the
- // _records vector.
- if (_forward) {
- while (getRecordId() < loc) {
- _recordsIndex++;
- if (_recordsIndex == _records.size()) {
- // This means we exhausted the scan and didn't find a record in range.
- advanceCursor();
- return false;
- }
- }
- } else {
- while (getRecordId() > loc) {
- _recordsIndex++;
- if (_recordsIndex == _records.size()) {
- advanceCursor();
- return false;
- }
- }
+ auto value = _iterator->value();
+ BufReader br(value.data(), value.size());
+ RecordId locInIndex = KeyString::decodeRecordId(&br);
+
+ if ((_forward && (locInIndex < loc)) || (!_forward && (locInIndex > loc))) {
+ advanceCursor();
}
return true;
}
- virtual RecordId getRecordId() const {
- if (isEOF()) {
- return RecordId();
- }
+ void updateLocAndTypeBits() {
+ // We assume that cursors can only ever see unique indexes in their "pristine"
+ // state,
+ // where no duplicates are possible. The cases where dups are allowed should hold
+ // sufficient locks to ensure that no cursor ever sees them.
- loadValueIfNeeded();
- dassert(!_records[_recordsIndex].first.isNull());
- return _records[_recordsIndex].first;
- }
-
- virtual void advance() {
- // Advance on a cursor at the end is a no-op
- if (isEOF()) {
- return;
- }
+ auto value = _iterator->value();
+ BufReader br(value.data(), value.size());
+ _loc = KeyString::decodeRecordId(&br);
+ _typeBits.resetFromBuffer(&br);
- // We may just be advancing within the RecordIds for this key.
- loadValueIfNeeded();
- _recordsIndex++;
- if (_recordsIndex == _records.size()) {
- advanceCursor();
+ if (!br.atEof()) {
+ severe() << "Unique index cursor seeing multiple records for key " << getKey();
+ fassertFailed(28609);
}
}
+ };
- virtual const KeyString::TypeBits& getTypeBits() const {
- invariant(!isEOF());
- loadValueIfNeeded();
- return _records[_recordsIndex].second;
- }
+ } // namespace
- private:
- void loadValueIfNeeded() const {
- if (!_records.empty()) {
- return;
- }
+ /**
+ * Bulk builds a non-unique index.
+ */
+ class RocksIndexBase::StandardBulkBuilder : public SortedDataBuilderInterface {
+ public:
+ StandardBulkBuilder(RocksStandardIndex* index, OperationContext* txn) : _index(index), _txn(txn) {}
- _recordsIndex = 0;
- auto value = _iterator->value();
- BufReader br(value.data(), value.size());
- while (br.remaining()) {
- RecordId loc = KeyString::decodeRecordId(&br);
- _records.push_back(std::make_pair(loc, KeyString::TypeBits::fromBuffer(&br)));
- }
- invariant(!_records.empty());
+ Status addKey(const BSONObj& key, const RecordId& loc) {
+ return _index->insert(_txn, key, loc, true);
+ }
- if (!_forward) {
- std::reverse(_records.begin(), _records.end());
+ void commit(bool mayInterrupt) {
+ WriteUnitOfWork uow(_txn);
+ uow.commit();
+ }
+
+ private:
+ RocksStandardIndex* _index;
+ OperationContext* _txn;
+ };
+
+ /**
+ * Bulk builds a unique index.
+ *
+ * In order to support unique indexes in dupsAllowed mode this class only does an actual insert
+ * after it sees a key after the one we are trying to insert. This allows us to gather up all
+ * duplicate locs and insert them all together. This is necessary since bulk cursors can only
+ * append data.
+ */
+ class RocksIndexBase::UniqueBulkBuilder : public SortedDataBuilderInterface {
+ public:
+ UniqueBulkBuilder(std::string prefix, Ordering ordering, OperationContext* txn, bool dupsAllowed)
+ : _prefix(std::move(prefix)), _ordering(ordering), _txn(txn), _dupsAllowed(dupsAllowed) {}
+
+ Status addKey(const BSONObj& newKey, const RecordId& loc) {
+ Status s = checkKeySize(newKey);
+ if (!s.isOK()) {
+ return s;
+ }
+
+ const int cmp = newKey.woCompare(_key, _ordering);
+ if (cmp != 0) {
+ if (!_key.isEmpty()) { // _key.isEmpty() is only true on the first call to addKey().
+ invariant(cmp > 0); // newKey must be > the last key
+ // We are done with dups of the last key so we can insert it now.
+ doInsert();
}
+ invariant(_records.empty());
}
+ else {
+ // Dup found!
+ if (!_dupsAllowed) {
+ return Status(ErrorCodes::DuplicateKey, dupKeyError(newKey));
+ }
- mutable size_t _recordsIndex;
- mutable std::vector<std::pair<RecordId, KeyString::TypeBits> > _records;
- };
-
- // TODO optimize and create two implementations -- one for unique and one for standard index
- class RocksIndexBulkBuilder : public SortedDataBuilderInterface {
- public:
- RocksIndexBulkBuilder(RocksIndexBase* index, OperationContext* txn, bool dupsAllowed)
- : _index(index), _txn(txn), _dupsAllowed(dupsAllowed) {
- invariant(index->isEmpty(txn));
+ // If we get here, we are in the weird mode where dups are allowed on a unique
+ // index, so add ourselves to the list of duplicate locs. This also replaces the
+ // _key which is correct since any dups seen later are likely to be newer.
}
- Status addKey(const BSONObj& key, const RecordId& loc) {
- return _index->insert(_txn, key, loc, _dupsAllowed);
+ _key = newKey.getOwned();
+ _keyString.resetToKey(_key, _ordering);
+ _records.push_back(std::make_pair(loc, _keyString.getTypeBits()));
+
+ return Status::OK();
+ }
+
+ void commit(bool mayInterrupt) {
+ WriteUnitOfWork uow(_txn);
+ if (!_records.empty()) {
+ // This handles inserting the last unique key.
+ doInsert();
}
+ uow.commit();
+ }
- void commit(bool mayInterrupt) {
- WriteUnitOfWork uow(_txn);
- uow.commit();
+ private:
+ void doInsert() {
+ invariant(!_records.empty());
+
+ KeyString value;
+ for (size_t i = 0; i < _records.size(); i++) {
+ value.appendRecordId(_records[i].first);
+ // When there is only one record, we can omit AllZeros TypeBits. Otherwise they need
+ // to be included.
+ if (!(_records[i].second.isAllZeros() && _records.size() == 1)) {
+ value.appendTypeBits(_records[i].second);
+ }
}
- private:
- RocksIndexBase* _index;
- OperationContext* _txn;
- bool _dupsAllowed;
- };
+ std::string prefixedKey(RocksIndexBase::_makePrefixedKey(_prefix, _keyString));
+ rocksdb::Slice valueSlice(value.getBuffer(), value.getSize());
- } // namespace
+ auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(_txn);
+ ru->writeBatch()->Put(prefixedKey, valueSlice);
+
+ _records.clear();
+ }
+
+ std::string _prefix;
+ Ordering _ordering;
+ OperationContext* _txn;
+ const bool _dupsAllowed;
+ BSONObj _key;
+ KeyString _keyString;
+ std::vector<std::pair<RecordId, KeyString::TypeBits>> _records;
+ };
/// RocksIndexBase
@@ -502,11 +531,6 @@ namespace mongo {
Ordering order)
: _db(db), _prefix(prefix), _ident(std::move(ident)), _order(order) {}
- SortedDataBuilderInterface* RocksIndexBase::getBulkBuilder(OperationContext* txn,
- bool dupsAllowed) {
- return new RocksIndexBulkBuilder(this, txn, dupsAllowed);
- }
-
void RocksIndexBase::fullValidate(OperationContext* txn, bool full, long long* numKeysOut,
BSONObjBuilder* output) const {
if (numKeysOut) {
@@ -538,7 +562,9 @@ namespace mongo {
std::string nextPrefix = std::move(rocksGetNextPrefix(_prefix));
rocksdb::Range wholeRange(_prefix, nextPrefix);
_db->GetApproximateSizes(&wholeRange, 1, &storageSize);
- return static_cast<long long>(storageSize);
+ // There might be some bytes in the WAL that we don't count here. Some
+ // tests depend on the fact that non-empty indexes have non-zero sizes
+ return static_cast<long long>(std::max(storageSize, static_cast<uint64_t>(1)));
}
std::string RocksIndexBase::_makePrefixedKey(const std::string& prefix,
@@ -572,9 +598,7 @@ namespace mongo {
std::string currentValue;
auto getStatus = ru->Get(prefixedKey, &currentValue);
if (!getStatus.ok() && !getStatus.IsNotFound()) {
- // This means that Get() returned an error
- // TODO: SERVER-16979 Correctly handle errors returned by RocksDB
- invariant(false);
+ return rocksToMongoStatus(getStatus);
} else if (getStatus.IsNotFound()) {
// nothing here. just insert the value
KeyString value(loc);
@@ -645,14 +669,11 @@ namespace mongo {
// dups are allowed, so we have to deal with a vector of RecordIds.
std::string currentValue;
auto getStatus = ru->Get(prefixedKey, &currentValue);
- if (!getStatus.ok() && !getStatus.IsNotFound()) {
- // This means that Get() returned an error
- // TODO: SERVER-16979 Correctly handle errors returned by RocksDB
- invariant(false);
- } else if (getStatus.IsNotFound()) {
+ if (getStatus.IsNotFound()) {
// nothing here. just return
return;
}
+ invariantRocksOK(getStatus);
bool foundLoc = false;
std::vector<std::pair<RecordId, KeyString::TypeBits>> records;
@@ -713,9 +734,7 @@ namespace mongo {
std::string value;
auto getStatus = ru->Get(prefixedKey, &value);
if (!getStatus.ok() && !getStatus.IsNotFound()) {
- // This means that Get() returned an error
- // TODO: SERVER-16979 Correctly handle errors returned by RocksDB
- invariant(false);
+ return rocksToMongoStatus(getStatus);
} else if (getStatus.IsNotFound()) {
// not found, not duplicate key
return Status::OK();
@@ -734,6 +753,11 @@ namespace mongo {
return Status(ErrorCodes::DuplicateKey, dupKeyError(key));
}
+ SortedDataBuilderInterface* RocksUniqueIndex::getBulkBuilder(OperationContext* txn,
+ bool dupsAllowed) {
+ return new RocksIndexBase::UniqueBulkBuilder(_prefix, _order, txn, dupsAllowed);
+ }
+
/// RocksStandardIndex
RocksStandardIndex::RocksStandardIndex(rocksdb::DB* db, std::string prefix, std::string ident,
Ordering order)
@@ -783,5 +807,10 @@ namespace mongo {
return new RocksStandardCursor(txn, _db, _prefix, direction == 1, _order);
}
+ SortedDataBuilderInterface* RocksStandardIndex::getBulkBuilder(OperationContext* txn,
+ bool dupsAllowed) {
+ invariant(dupsAllowed);
+ return new RocksIndexBase::StandardBulkBuilder(this, txn);
+ }
} // namespace mongo
diff --git a/src/mongo/db/storage/rocks/rocks_index.h b/src/mongo/db/storage/rocks/rocks_index.h
index 3c91185aac2..b551a7987e0 100644
--- a/src/mongo/db/storage/rocks/rocks_index.h
+++ b/src/mongo/db/storage/rocks/rocks_index.h
@@ -53,14 +53,15 @@ namespace mongo {
public:
RocksIndexBase(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order);
- virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed);
+ virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn,
+ bool dupsAllowed) = 0;
virtual void fullValidate(OperationContext* txn, bool full, long long* numKeysOut,
BSONObjBuilder* output) const;
virtual bool appendCustomStats(OperationContext* txn, BSONObjBuilder* output,
double scale) const {
- // TODO
+ // nothing to say here, really
return false;
}
@@ -81,6 +82,10 @@ namespace mongo {
// used to construct RocksCursors
const Ordering _order;
+
+ class StandardBulkBuilder;
+ class UniqueBulkBuilder;
+ friend class UniqueBulkBuilder;
};
class RocksUniqueIndex : public RocksIndexBase {
@@ -94,6 +99,9 @@ namespace mongo {
virtual SortedDataInterface::Cursor* newCursor(OperationContext* txn, int direction) const;
virtual Status dupKeyCheck(OperationContext* txn, const BSONObj& key, const RecordId& loc);
+
+ virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn,
+ bool dupsAllowed) override;
};
class RocksStandardIndex : public RocksIndexBase {
@@ -110,6 +118,9 @@ namespace mongo {
// dupKeyCheck shouldn't be called for non-unique indexes
invariant(false);
}
+
+ virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn,
+ bool dupsAllowed) override;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/rocks/rocks_index_test.cpp b/src/mongo/db/storage/rocks/rocks_index_test.cpp
index 43effa007b1..b6f865fbaf1 100644
--- a/src/mongo/db/storage/rocks/rocks_index_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_index_test.cpp
@@ -40,13 +40,14 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/storage/sorted_data_interface_test_harness.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/rocks/rocks_index.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
-#include "mongo/db/storage/rocks/rocks_transaction.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
+#include "rocks_engine.h"
+#include "rocks_index.h"
+#include "rocks_recovery_unit.h"
+#include "rocks_transaction.h"
+
namespace mongo {
using boost::scoped_ptr;
diff --git a/src/mongo/db/storage/rocks/rocks_init.cpp b/src/mongo/db/storage/rocks/rocks_init.cpp
index 1f30a87f526..18d8b9d08c4 100644
--- a/src/mongo/db/storage/rocks/rocks_init.cpp
+++ b/src/mongo/db/storage/rocks/rocks_init.cpp
@@ -29,9 +29,6 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/rocks/rocks_server_status.h"
-
#include "mongo/base/init.h"
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/storage_options.h"
@@ -39,10 +36,15 @@
#include "mongo/db/storage/storage_engine_metadata.h"
#include "mongo/util/mongoutils/str.h"
+#include "rocks_engine.h"
+#include "rocks_server_status.h"
+#include "rocks_parameters.h"
+
namespace mongo {
- const std::string kRocksDBEngineName = "RocksDB";
+ const std::string kRocksDBEngineName = "rocksdb";
namespace {
+
class RocksFactory : public StorageEngine::Factory {
public:
virtual ~RocksFactory(){}
@@ -56,6 +58,7 @@ namespace mongo {
auto engine = new RocksEngine(params.dbpath + "/db", params.dur);
// Intentionally leaked.
auto leaked __attribute__((unused)) = new RocksServerStatusSection(engine);
+ auto leaked2 __attribute__((unused)) = new RocksRateLimiterServerParameter(engine);
return new KVStorageEngine(engine, options);
}
@@ -106,7 +109,11 @@ namespace mongo {
// and mongorestore.
// * Version 1 was the format with many column families -- one column family for each
// collection and index
- // * Version 2 (current) keeps all collections and indexes in a single column family
+ // * Version 2 keeps all collections and indexes in a single column family
+ // * Version 3 (current) reserves two prefixes for oplog. one prefix keeps the oplog
+ // documents and another only keeps keys. That way, we can cleanup the oplog without
+ // reading full documents
+ // oplog cleanup
const int kRocksFormatVersion = 2;
const std::string kRocksFormatVersionString = "rocksFormatVersion";
};
diff --git a/src/mongo/db/storage/rocks/rocks_options_init.cpp b/src/mongo/db/storage/rocks/rocks_options_init.cpp
index 366d4b3e374..4b5ce697d1b 100644
--- a/src/mongo/db/storage/rocks/rocks_options_init.cpp
+++ b/src/mongo/db/storage/rocks/rocks_options_init.cpp
@@ -26,12 +26,13 @@
* it in the license file.
*/
-#include "mongo/util/options_parser/startup_option_init.h"
#include <iostream>
+#include "mongo/util/options_parser/startup_option_init.h"
#include "mongo/util/options_parser/startup_options.h"
-#include "mongo/db/storage/rocks/rocks_global_options.h"
+
+#include "rocks_global_options.h"
namespace mongo {
diff --git a/src/mongo/db/storage/rocks/rocks_parameters.cpp b/src/mongo/db/storage/rocks/rocks_parameters.cpp
new file mode 100644
index 00000000000..4181d8aab2f
--- /dev/null
+++ b/src/mongo/db/storage/rocks/rocks_parameters.cpp
@@ -0,0 +1,73 @@
+/**
+* Copyright (C) 2014 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "rocks_parameters.h"
+
+#include "mongo/logger/parse_log_component_settings.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+ RocksRateLimiterServerParameter::RocksRateLimiterServerParameter(RocksEngine* engine)
+ : ServerParameter(ServerParameterSet::getGlobal(), "rocksdbRuntimeConfigMaxWriteMBPerSec",
+ false, true),
+ _engine(engine) {}
+
+ void RocksRateLimiterServerParameter::append(OperationContext* txn, BSONObjBuilder& b,
+ const std::string& name) {
+ b.append(name, _engine->getMaxWriteMBPerSec());
+ }
+
+ Status RocksRateLimiterServerParameter::set(const BSONElement& newValueElement) {
+ if (!newValueElement.isNumber()) {
+ return Status(ErrorCodes::BadValue, str::stream() << name() << " has to be a number");
+ }
+ return _set(newValueElement.numberInt());
+ }
+
+ Status RocksRateLimiterServerParameter::setFromString(const std::string& str) {
+ int num = 0;
+ Status status = parseNumberFromString(str, &num);
+ if (!status.isOK()) return status;
+ return _set(num);
+ }
+
+ Status RocksRateLimiterServerParameter::_set(int newNum) {
+ if (newNum <= 0) {
+ return Status(ErrorCodes::BadValue, str::stream() << name() << " has to be > 0");
+ }
+ log() << "RocksDB: changing rate limiter to " << newNum << "MB/s";
+ _engine->setMaxWriteMBPerSec(newNum);
+
+ return Status::OK();
+ }
+}
diff --git a/src/mongo/db/storage/rocks/rocks_parameters.h b/src/mongo/db/storage/rocks/rocks_parameters.h
new file mode 100644
index 00000000000..b447a786386
--- /dev/null
+++ b/src/mongo/db/storage/rocks/rocks_parameters.h
@@ -0,0 +1,50 @@
+/**
+* Copyright (C) 2014 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects for
+* all of the code used other than as permitted herein. If you modify file(s)
+* with this exception, you may extend this exception to your version of the
+* file(s), but you are not obligated to do so. If you do not wish to do so,
+* delete this exception statement from your version. If you delete this
+* exception statement from all source files in the program, then also delete
+* it in the license file.
+*/
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/db/server_parameters.h"
+
+#include "rocks_engine.h"
+
+namespace mongo {
+
+ class RocksRateLimiterServerParameter : public ServerParameter {
+ MONGO_DISALLOW_COPYING(RocksRateLimiterServerParameter);
+
+ public:
+ RocksRateLimiterServerParameter(RocksEngine* engine);
+ virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name);
+ virtual Status set(const BSONElement& newValueElement);
+ virtual Status setFromString(const std::string& str);
+
+ private:
+ Status _set(int newNum);
+ RocksEngine* _engine;
+ };
+
+}
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp
index 443970b1226..810b7b08f79 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp
@@ -31,7 +31,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/storage/rocks/rocks_record_store.h"
+#include "rocks_record_store.h"
#include <boost/scoped_array.hpp>
#include <boost/shared_ptr.hpp>
@@ -49,14 +49,17 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
#include "mongo/db/storage/oplog_hack.h"
#include "mongo/platform/endian.h"
#include "mongo/util/background.h"
#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
#include "mongo/util/timer.h"
+#include "rocks_engine.h"
+#include "rocks_recovery_unit.h"
+#include "rocks_util.h"
+
namespace mongo {
using boost::shared_ptr;
@@ -88,8 +91,7 @@ namespace mongo {
void CappedVisibilityManager::_addUncommittedRecord_inlock(OperationContext* txn,
const RecordId& record) {
- // todo: make this a dassert at some point
- invariant(_uncommittedRecords.empty() || _uncommittedRecords.back() < record);
+ dassert(_uncommittedRecords.empty() || _uncommittedRecords.back() < record);
_uncommittedRecords.push_back(record);
txn->recoveryUnit()->registerChange(new CappedInsertChange(this, record));
_oplog_highestSeen = record;
@@ -137,6 +139,36 @@ namespace mongo {
}
}
+ // this object keeps track of keys in oplog. The format is this:
+ // <prefix>RecordId --> dataSize (small endian 32 bytes)
+ // <prefix> is oplog_prefix+1 (reserved by rocks_engine.cpp)
+ // That way we can cheaply delete old record in the oplog without actually reading oplog
+ // collection.
+ // All of the locking is done somewhere else -- we write exactly the same data as oplog, so we
+ // assume oplog already locked the relevant keys
+ class RocksOplogKeyTracker {
+ public:
+ RocksOplogKeyTracker(std::string prefix) : _prefix(std::move(prefix)) {}
+ void insertKey(RocksRecoveryUnit* ru, const RecordId& loc, int len) {
+ uint32_t lenLittleEndian = endian::nativeToLittle(static_cast<uint32_t>(len));
+ ru->writeBatch()->Put(RocksRecordStore::_makePrefixedKey(_prefix, loc),
+ rocksdb::Slice(reinterpret_cast<const char*>(&lenLittleEndian),
+ sizeof(lenLittleEndian)));
+ }
+ void deleteKey(RocksRecoveryUnit* ru, const RecordId& loc) {
+ ru->writeBatch()->Delete(RocksRecordStore::_makePrefixedKey(_prefix, loc));
+ }
+ rocksdb::Iterator* newIterator(RocksRecoveryUnit* ru) { return ru->NewIterator(_prefix); }
+ int decodeSize(const rocksdb::Slice& value) {
+ uint32_t size =
+ endian::littleToNative(*reinterpret_cast<const uint32_t*>(value.data()));
+ return static_cast<int>(size);
+ }
+
+ private:
+ std::string _prefix;
+ };
+
RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id,
rocksdb::DB* db, // not owned here
std::string prefix, bool isCapped, int64_t cappedMaxSize,
@@ -152,7 +184,9 @@ namespace mongo {
_cappedDeleteCallback(cappedDeleteCallback),
_cappedDeleteCheckCount(0),
_isOplog(NamespaceString::oplog(ns)),
- _oplogCounter(0),
+ _oplogKeyTracker(
+ _isOplog ? new RocksOplogKeyTracker(std::move(rocksGetNextPrefix(_prefix))) : nullptr),
+ _oplogNextToDelete(0),
_cappedVisibilityManager((_isCapped || _isOplog) ? new CappedVisibilityManager()
: nullptr),
_ident(id.toString()),
@@ -199,12 +233,15 @@ namespace mongo {
boost::timed_mutex::scoped_lock lk(_cappedDeleterMutex);
_shuttingDown = true;
}
+ delete _oplogKeyTracker;
}
int64_t RocksRecordStore::storageSize(OperationContext* txn, BSONObjBuilder* extraInfo,
int infoLevel) const {
- // we're lying, but that's the best we can do for now
- return _dataSize.load();
+ // We need to make it multiple of 256 to make
+ // jstests/concurrency/fsm_workloads/convert_to_capped_collection.js happy
+ return static_cast<int64_t>(
+ std::max(_dataSize.load() & (~255), static_cast<long long>(256)));
}
RecordData RocksRecordStore::dataFor(OperationContext* txn, const RecordId& loc) const {
@@ -222,15 +259,25 @@ namespace mongo {
}
std::string oldValue;
- ru->Get(key, &oldValue);
+ auto status = ru->Get(key, &oldValue);
+ invariantRocksOK(status);
int oldLength = oldValue.size();
ru->writeBatch()->Delete(key);
+ if (_isOplog) {
+ _oplogKeyTracker->deleteKey(ru, dl);
+ }
_changeNumRecords(txn, -1);
_increaseDataSize(txn, -oldLength);
}
+ long long RocksRecordStore::dataSize(OperationContext* txn) const {
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ return _dataSize.load(std::memory_order::memory_order_relaxed) +
+ ru->getDeltaCounter(_dataSizeKey);
+ }
+
long long RocksRecordStore::numRecords(OperationContext* txn) const {
RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
return _numRecords.load(std::memory_order::memory_order_relaxed) +
@@ -327,19 +374,37 @@ namespace mongo {
if (_cappedMaxDocs != -1 && numRecords > _cappedMaxDocs) {
docsOverCap = numRecords - _cappedMaxDocs;
}
+ BSONObj emptyBson;
try {
WriteUnitOfWork wuow(txn);
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
- boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix));
- iter->SeekToFirst();
+ boost::scoped_ptr<rocksdb::Iterator> iter;
+ if (_isOplog) {
+ // we're using _oplogKeyTracker to find which keys to delete -- this is much faster
+ // because we don't need to read any values. We theoretically need values to pass
+ // the document to the cappedDeleteCallback, but the callback is only using
+ // documents to remove them from indexes. opLog doesn't have indexes, so there
+ // should be no need for us to reconstruct the document to pass it to the callback
+ iter.reset(_oplogKeyTracker->newIterator(ru));
+ int64_t storage;
+ iter->Seek(RocksRecordStore::_makeKey(_oplogNextToDelete, &storage));
+ } else {
+ iter.reset(ru->NewIterator(_prefix));
+ iter->SeekToFirst();
+ }
RecordId newestOld;
while ((sizeSaved < sizeOverCap || docsRemoved < docsOverCap) &&
(docsRemoved < 20000) && iter->Valid()) {
- rocksdb::Slice slice = iter->key();
- newestOld = _makeRecordId(slice);
+ newestOld = _makeRecordId(iter->key());
+
+ if (_cappedVisibilityManager->isCappedHidden(newestOld)) {
+ // this means we have an older record that hasn't been committed yet. let's
+ // wait until it gets committed before deleting
+ break;
+ }
// don't go past the record we just inserted
if (newestOld >= justInserted) {
@@ -356,9 +421,17 @@ namespace mongo {
break;
}
- auto oldValue = iter->value();
+ rocksdb::Slice oldValue;
++docsRemoved;
- sizeSaved += oldValue.size();
+ if (_isOplog) {
+ // trick the callback by giving it empty bson document
+ oldValue = rocksdb::Slice(emptyBson.objdata(), emptyBson.objsize());
+ // we keep data size in the value
+ sizeSaved += _oplogKeyTracker->decodeSize(iter->value());
+ } else {
+ oldValue = iter->value();
+ sizeSaved += oldValue.size();
+ }
if (_cappedDeleteCallback) {
uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(
@@ -367,14 +440,34 @@ namespace mongo {
}
ru->writeBatch()->Delete(key);
+ if (_isOplog) {
+ _oplogKeyTracker->deleteKey(ru, newestOld);
+ }
+
iter->Next();
}
+ if (!iter->status().ok()) {
+ log() << "RocksDB iterator failure when trying to delete capped, ignoring: "
+ << iter->status().ToString();
+ }
+
if (docsRemoved > 0) {
_changeNumRecords(txn, -docsRemoved);
_increaseDataSize(txn, -sizeSaved);
wuow.commit();
}
+
+ if (_isOplog && iter->Valid()) {
+ auto oldestAliveRecordId = _makeRecordId(iter->key());
+ // we check if there's outstanding transaction that is older than
+ // oldestAliveRecordId. If there is, we should not skip deleting that record next
+ // time we clean up oplog. If there isn't, we know for certain this is the record
+ // we'll start out deletions from next time
+ if (!_cappedVisibilityManager->isCappedHidden(oldestAliveRecordId)) {
+ _oplogNextToDelete = oldestAliveRecordId;
+ }
+ }
}
catch ( const WriteConflictException& wce ) {
delete txn->releaseRecoveryUnit();
@@ -391,7 +484,6 @@ namespace mongo {
delete txn->releaseRecoveryUnit();
txn->setRecoveryUnit( realRecoveryUnit );
return docsRemoved;
-
}
StatusWith<RecordId> RocksRecordStore::insertRecord( OperationContext* txn,
@@ -424,6 +516,9 @@ namespace mongo {
// No need to register the write here, since we just allocated a new RecordId so no other
// transaction can access this key before we commit
ru->writeBatch()->Put(_makePrefixedKey(_prefix, loc), rocksdb::Slice(data, len));
+ if (_isOplog) {
+ _oplogKeyTracker->insertKey(ru, loc, len);
+ }
_changeNumRecords( txn, 1 );
_increaseDataSize( txn, len );
@@ -458,14 +553,14 @@ namespace mongo {
std::string old_value;
auto status = ru->Get(key, &old_value);
-
- if ( !status.ok() ) {
- return StatusWith<RecordId>( ErrorCodes::InternalError, status.ToString() );
- }
+ invariantRocksOK(status);
int old_length = old_value.size();
ru->writeBatch()->Put(key, rocksdb::Slice(data, len));
+ if (_isOplog) {
+ _oplogKeyTracker->insertKey(ru, loc, len);
+ }
_increaseDataSize(txn, len - old_length);
@@ -507,8 +602,6 @@ namespace mongo {
}
Status RocksRecordStore::truncate( OperationContext* txn ) {
- // XXX once we have readable WriteBatch, also delete outstanding writes to
- // this collection in the WriteBatch
boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) );
while( !iter->isEOF() ) {
RecordId loc = iter->getNext();
@@ -526,11 +619,7 @@ namespace mongo {
std::string endString(_makePrefixedKey(_prefix, RecordId::max()));
rocksdb::Slice beginRange(beginString);
rocksdb::Slice endRange(endString);
- rocksdb::Status status = _db->CompactRange(&beginRange, &endRange);
- if ( status.ok() )
- return Status::OK();
- else
- return Status( ErrorCodes::InternalError, status.ToString() );
+ return rocksToMongoStatus(_db->CompactRange(&beginRange, &endRange));
}
Status RocksRecordStore::validate( OperationContext* txn,
@@ -539,32 +628,47 @@ namespace mongo {
ValidateAdaptor* adaptor,
ValidateResults* results,
BSONObjBuilder* output ) {
- // TODO validate that _numRecords and _dataSize are correct in scanData mode
- if ( scanData ) {
- bool invalidObject = false;
- size_t numRecords = 0;
- boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) );
- while( !iter->isEOF() ) {
- numRecords++;
+ long long nrecords = 0;
+ long long dataSizeTotal = 0;
+ if (scanData) {
+ boost::scoped_ptr<RecordIterator> iter(getIterator(txn));
+ results->valid = true;
+ while (!iter->isEOF()) {
+ ++nrecords;
if (full) {
- RecordData data = dataFor(txn, iter->curr());
size_t dataSize;
- const Status status = adaptor->validate(data, &dataSize);
+ RecordId loc = iter->curr();
+ RecordData data = dataFor(txn, loc);
+ Status status = adaptor->validate(data, &dataSize);
if (!status.isOK()) {
results->valid = false;
- if (invalidObject) {
- results->errors.push_back("invalid object detected (see logs)");
- }
- invalidObject = true;
- log() << "Invalid object detected in " << _ns << ": " << status.reason();
+ results->errors.push_back(str::stream() << loc << " is corrupted");
}
+ dataSizeTotal += static_cast<long long>(dataSize);
}
iter->getNext();
}
- output->appendNumber("nrecords", numRecords);
- }
- else
+
+ if (full && results->valid) {
+ long long storedNumRecords = numRecords(txn);
+ long long storedDataSize = dataSize(txn);
+
+ if (nrecords != storedNumRecords || dataSizeTotal != storedDataSize) {
+ warning() << _ident << ": Existing record and data size counters ("
+ << storedNumRecords << " records " << storedDataSize << " bytes) "
+ << "are inconsistent with full validation results (" << nrecords
+ << " records " << dataSizeTotal << " bytes). "
+ << "Updating counters with new values.";
+ if (nrecords != storedNumRecords) {
+ _changeNumRecords(txn, nrecords - storedNumRecords);
+ _increaseDataSize(txn, dataSizeTotal - storedDataSize);
+ }
+ }
+ }
+ output->appendNumber("nrecords", nrecords);
+ } else {
output->appendNumber("nrecords", numRecords(txn));
+ }
return Status::OK();
}
@@ -590,6 +694,18 @@ namespace mongo {
return record.getStatus();
}
+ void RocksRecordStore::updateStatsAfterRepair(OperationContext* txn, long long numRecords,
+ long long dataSize) {
+ _numRecords.store(numRecords);
+ _dataSize.store(dataSize);
+ rocksdb::WriteBatch wb;
+ int64_t storage;
+ wb.Put(_numRecordsKey, RocksRecoveryUnit::encodeCounter(numRecords, &storage));
+ wb.Put(_dataSizeKey, RocksRecoveryUnit::encodeCounter(dataSize, &storage));
+ auto s = _db->Write(rocksdb::WriteOptions(), &wb);
+ invariantRocksOK(s);
+ }
+
/**
* Return the RecordId of an oplog entry as close to startingPosition as possible without
* being higher. If there are no entries <= startingPosition, return RecordId().
@@ -604,7 +720,10 @@ namespace mongo {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
ru->setOplogReadTill(_cappedVisibilityManager->oplogStartHack());
- boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix));
+ // we use _oplogKeyTracker, which contains exactly the same keys as oplog. the difference is
+ // that values are different (much smaller), so reading is faster. in this case, we only
+ // need keys (we never touch the values), so this works nicely
+ boost::scoped_ptr<rocksdb::Iterator> iter(_oplogKeyTracker->newIterator(ru));
int64_t storage;
iter->Seek(_makeKey(startingPosition, &storage));
if (!iter->Valid()) {
@@ -613,6 +732,7 @@ namespace mongo {
// startingPosition is bigger than everything else
return _makeRecordId(iter->key());
} else {
+ invariantRocksOK(iter->status());
// record store is empty
return RecordId();
}
@@ -631,6 +751,7 @@ namespace mongo {
}
if (!iter->Valid()) {
+ invariantRocksOK(iter->status());
// there are no entries <= startingPosition
return RecordId();
}
@@ -653,14 +774,6 @@ namespace mongo {
wuow.commit();
}
- rocksdb::ReadOptions RocksRecordStore::_readOptions(OperationContext* opCtx) {
- rocksdb::ReadOptions options;
- if ( opCtx ) {
- options.snapshot = RocksRecoveryUnit::getRocksRecoveryUnit( opCtx )->snapshot();
- }
- return options;
- }
-
RecordId RocksRecordStore::_nextId() {
invariant(!_isOplog);
return RecordId(_nextIdNum.fetchAndAdd(1));
@@ -701,14 +814,10 @@ namespace mongo {
std::string valueStorage;
auto status = ru->Get(_makePrefixedKey(prefix, loc), &valueStorage);
- if (!status.ok()) {
- if (status.IsNotFound()) {
- return RecordData(nullptr, 0);
- } else {
- log() << "rocks Get failed, blowing up: " << status.ToString();
- invariant(false);
- }
+ if (status.IsNotFound()) {
+ return RecordData(nullptr, 0);
}
+ invariantRocksOK(status);
SharedBuffer data = SharedBuffer::allocate(valueStorage.size());
memcpy(data.get(), valueStorage.data(), valueStorage.size());
@@ -743,12 +852,6 @@ namespace mongo {
_locate(start);
}
- void RocksRecordStore::Iterator::_checkStatus() {
- if ( !_iterator->status().ok() )
- log() << "Rocks Iterator Error: " << _iterator->status().ToString();
- invariant( _iterator->status().ok() );
- }
-
bool RocksRecordStore::Iterator::isEOF() {
return _eof;
}
@@ -773,6 +876,8 @@ namespace mongo {
else
_iterator->Prev();
+ invariantRocksOK(_iterator->status());
+
if (_iterator->Valid()) {
_curr = _decodeCurr();
if (_cappedVisibilityManager.get()) { // isCapped?
@@ -795,7 +900,6 @@ namespace mongo {
// we leave _curr as it is on purpose
}
- _checkStatus();
_lastLoc = toReturn;
return toReturn;
}
@@ -861,7 +965,7 @@ namespace mongo {
int64_t locStorage;
_iterator->Seek(RocksRecordStore::_makeKey(loc, &locStorage));
}
- _checkStatus();
+ invariantRocksOK(_iterator->status());
} else { // backward iterator
if (loc.isNull()) {
_iterator->SeekToLast();
@@ -869,14 +973,14 @@ namespace mongo {
// lower bound on reverse iterator
int64_t locStorage;
_iterator->Seek(RocksRecordStore::_makeKey(loc, &locStorage));
- _checkStatus();
+ invariantRocksOK(_iterator->status());
if (!_iterator->Valid()) {
_iterator->SeekToLast();
} else if (_decodeCurr() != loc) {
_iterator->Prev();
}
}
- _checkStatus();
+ invariantRocksOK(_iterator->status());
}
_eof = !_iterator->Valid();
if (_eof) {
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h
index d8f6bcf6cf4..1257ffe9c27 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.h
+++ b/src/mongo/db/storage/rocks/rocks_record_store.h
@@ -75,6 +75,7 @@ namespace mongo {
};
class RocksRecoveryUnit;
+ class RocksOplogKeyTracker;
class RocksRecordStore : public RecordStore {
public:
@@ -88,7 +89,7 @@ namespace mongo {
// name of the RecordStore implementation
virtual const char* name() const { return "rocks"; }
- virtual long long dataSize(OperationContext* txn) const { return _dataSize.load(); }
+ virtual long long dataSize(OperationContext* txn) const;
virtual long long numRecords( OperationContext* txn ) const;
@@ -167,11 +168,8 @@ namespace mongo {
virtual Status oplogDiskLocRegister(OperationContext* txn, const OpTime& opTime);
- virtual void updateStatsAfterRepair(OperationContext* txn,
- long long numRecords,
- long long dataSize) {
- // TODO
- }
+ virtual void updateStatsAfterRepair(OperationContext* txn, long long numRecords,
+ long long dataSize);
void setCappedDeleteCallback(CappedDocumentDeleteCallback* cb) {
_cappedDeleteCallback = cb;
@@ -187,6 +185,8 @@ namespace mongo {
static rocksdb::Comparator* newRocksCollectionComparator();
private:
+ // we just need to expose _makePrefixedKey to RocksOplogKeyTracker
+ friend class RocksOplogKeyTracker;
// NOTE: RecordIterator might outlive the RecordStore. That's why we use all those
// shared_ptrs
class Iterator : public RecordIterator {
@@ -207,7 +207,6 @@ namespace mongo {
void _locate(const RecordId& loc);
RecordId _decodeCurr() const;
bool _forward() const;
- void _checkStatus();
OperationContext* _txn;
rocksdb::DB* _db; // not owned
@@ -221,12 +220,6 @@ namespace mongo {
boost::scoped_ptr<rocksdb::Iterator> _iterator;
};
- /**
- * Returns a new ReadOptions struct, containing the snapshot held in opCtx, if opCtx is not
- * null
- */
- static rocksdb::ReadOptions _readOptions(OperationContext* opCtx = NULL);
-
static RecordId _makeRecordId( const rocksdb::Slice& slice );
static RecordData _getDataFor(rocksdb::DB* db, const std::string& prefix,
@@ -254,7 +247,14 @@ namespace mongo {
int _cappedDeleteCheckCount; // see comment in ::cappedDeleteAsNeeded
const bool _isOplog;
- int _oplogCounter;
+ // nullptr iff _isOplog == false
+ RocksOplogKeyTracker* _oplogKeyTracker;
+ // SeekToFirst() on an oplog is an expensive operation because bunch of keys at the start
+ // are deleted. To reduce the overhead, we remember the next key to delete and seek directly
+ // to it. This will not work correctly if somebody inserted a key before this
+ // _oplogNextToDelete. However, we prevent this from happening by using
+ // _cappedVisibilityManager and checking isCappedHidden() during deletions
+ RecordId _oplogNextToDelete;
boost::shared_ptr<CappedVisibilityManager> _cappedVisibilityManager;
diff --git a/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp b/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp
index d60bf6cb738..f897a5a944e 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp
@@ -29,7 +29,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
+#include "rocks_engine.h"
namespace mongo {
diff --git a/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp b/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp
index 91d86698705..0554496a95e 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp
@@ -42,13 +42,14 @@
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context_impl.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
-#include "mongo/db/storage/rocks/rocks_record_store.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
#include "mongo/util/background.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
+#include "rocks_engine.h"
+#include "rocks_record_store.h"
+#include "rocks_recovery_unit.h"
+
namespace mongo {
namespace {
@@ -59,7 +60,7 @@ namespace mongo {
class RocksRecordStoreThread : public BackgroundJob {
public:
RocksRecordStoreThread(const NamespaceString& ns)
- : _ns(ns) {
+ : BackgroundJob(true /* deleteSelf */), _ns(ns) {
_name = std::string("RocksRecordStoreThread for ") + _ns.toString();
}
@@ -123,11 +124,9 @@ namespace mongo {
// If we removed 0 documents, sleep a bit in case we're on a laptop
// or something to be nice.
sleepmillis(1000);
- }
- else if(removed < 1000) {
- // 1000 is the batch size, so we didn't even do a full batch,
- // which is the most efficient.
- sleepmillis(10);
+ } else {
+ // wake up every 100ms
+ sleepmillis(100);
}
}
diff --git a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
index b6ebcc7dfc4..d4ed147e94d 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
@@ -41,12 +41,13 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/storage/record_store_test_harness.h"
-#include "mongo/db/storage/rocks/rocks_record_store.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
-#include "mongo/db/storage/rocks/rocks_transaction.h"
#include "mongo/unittest/unittest.h"
#include "mongo/unittest/temp_dir.h"
+#include "rocks_record_store.h"
+#include "rocks_recovery_unit.h"
+#include "rocks_transaction.h"
+
namespace mongo {
using boost::scoped_ptr;
@@ -203,7 +204,6 @@ namespace mongo {
return res;
}
- // TODO remove from here once mongo made the test generic
TEST(RocksRecordStoreTest, OplogHack) {
RocksRecordStoreHarnessHelper harnessHelper;
scoped_ptr<RecordStore> rs(harnessHelper.newNonCappedRecordStore("local.oplog.foo"));
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
index 5ca11018209..2ec2f592df1 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
@@ -31,7 +31,7 @@
#include "mongo/platform/basic.h"
#include "mongo/platform/endian.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
+#include "rocks_recovery_unit.h"
#include <rocksdb/comparator.h>
#include <rocksdb/db.h>
@@ -44,10 +44,11 @@
#include "mongo/base/checked_cast.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/storage/rocks/rocks_transaction.h"
-#include "mongo/db/storage/rocks/rocks_util.h"
#include "mongo/util/log.h"
+#include "rocks_transaction.h"
+#include "rocks_util.h"
+
namespace mongo {
namespace {
class PrefixStrippingIterator : public rocksdb::Iterator {
@@ -149,7 +150,12 @@ namespace mongo {
}
bool RocksRecoveryUnit::awaitCommit() {
- // TODO
+ // Not sure what we should do here. awaitCommit() is called when WriteConcern is FSYNC or
+ // JOURNAL. In our case, we're doing JOURNAL WriteConcern for each transaction (no matter
+ // WriteConcern). However, if WriteConcern is FSYNC we should probably call Write() with
+ // sync option. So far we're just not doing anything. In the future we should figure which
+ // of the WriteConcerns is this (FSYNC or JOURNAL) and then if it's FSYNC do something
+ // special.
return true;
}
@@ -179,6 +185,7 @@ namespace mongo {
void RocksRecoveryUnit::_releaseSnapshot() {
if (_snapshot) {
+ _transaction.abort();
_db->ReleaseSnapshot(_snapshot);
_snapshot = nullptr;
}
@@ -191,9 +198,8 @@ namespace mongo {
auto& counter = pair.second;
counter._value->fetch_add(counter._delta, std::memory_order::memory_order_relaxed);
long long newValue = counter._value->load(std::memory_order::memory_order_relaxed);
- int64_t littleEndian = static_cast<int64_t>(endian::littleToNative(newValue));
- const char* nr_ptr = reinterpret_cast<const char*>(&littleEndian);
- writeBatch()->Put(pair.first, rocksdb::Slice(nr_ptr, sizeof(littleEndian)));
+ int64_t storage;
+ writeBatch()->Put(pair.first, encodeCounter(newValue, &storage));
}
if (_writeBatch->GetWriteBatch()->Count() != 0) {
@@ -202,10 +208,7 @@ namespace mongo {
rocksdb::WriteOptions writeOptions;
writeOptions.disableWAL = !_durable;
auto status = _db->Write(rocksdb::WriteOptions(), _writeBatch->GetWriteBatch());
- if (!status.ok()) {
- log() << "uh oh: " << status.ToString();
- invariant(!"rocks write batch commit failed");
- }
+ invariantRocksOK(status);
_transaction.commit();
}
_deltaCounters.clear();
@@ -219,7 +222,6 @@ namespace mongo {
}
_changes.clear();
- _transaction.abort();
_deltaCounters.clear();
_writeBatch.reset();
@@ -246,7 +248,6 @@ namespace mongo {
if (entry.type == rocksdb::WriteType::kDeleteRecord) {
return rocksdb::Status::NotFound();
}
- // TODO avoid double copy
*value = std::string(entry.value.data(), entry.value.size());
return rocksdb::Status::OK();
}
@@ -301,10 +302,8 @@ namespace mongo {
auto s = db->Get(rocksdb::ReadOptions(), counterKey, &value);
if (s.IsNotFound()) {
return 0;
- } else if (!s.ok()) {
- log() << "Counter get failed " << s.ToString();
- invariant(!"Counter get failed");
}
+ invariantRocksOK(s);
int64_t ret;
invariant(sizeof(ret) == value.size());
@@ -317,4 +316,8 @@ namespace mongo {
return checked_cast<RocksRecoveryUnit*>(opCtx->recoveryUnit());
}
+ rocksdb::Slice RocksRecoveryUnit::encodeCounter(long long counter, int64_t* storage) {
+ *storage = static_cast<int64_t>(endian::littleToNative(counter));
+ return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage));
+ }
}
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h
index 280e4b63f1e..d2834dcd0d8 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h
@@ -38,11 +38,14 @@
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include <rocksdb/slice.h>
+
#include "mongo/base/disallow_copying.h"
#include "mongo/base/owned_pointer_vector.h"
#include "mongo/db/record_id.h"
#include "mongo/db/storage/recovery_unit.h"
-#include "mongo/db/storage/rocks/rocks_transaction.h"
+
+#include "rocks_transaction.h"
namespace rocksdb {
class DB;
@@ -123,6 +126,8 @@ namespace mongo {
static int getTotalLiveRecoveryUnits() { return _totalLiveRecoveryUnits.load(); }
+ static rocksdb::Slice encodeCounter(long long counter, int64_t* storage);
+
private:
void _releaseSnapshot();
diff --git a/src/mongo/db/storage/rocks/rocks_server_status.cpp b/src/mongo/db/storage/rocks/rocks_server_status.cpp
index 945369d93a3..d7d3df8d757 100644
--- a/src/mongo/db/storage/rocks/rocks_server_status.cpp
+++ b/src/mongo/db/storage/rocks/rocks_server_status.cpp
@@ -31,8 +31,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/storage/rocks/rocks_server_status.h"
-#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
+#include "rocks_server_status.h"
#include "boost/scoped_ptr.hpp"
@@ -40,10 +39,13 @@
#include "mongo/base/checked_cast.h"
#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/storage/rocks/rocks_engine.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/scopeguard.h"
+#include "rocks_recovery_unit.h"
+#include "rocks_engine.h"
+#include "rocks_transaction.h"
+
namespace mongo {
using std::string;
@@ -62,7 +64,7 @@ namespace mongo {
} // namespace
RocksServerStatusSection::RocksServerStatusSection(RocksEngine* engine)
- : ServerStatusSection("RocksDB"), _engine(engine) {}
+ : ServerStatusSection("rocksdb"), _engine(engine) {}
bool RocksServerStatusSection::includeByDefault() const { return true; }
@@ -108,6 +110,10 @@ namespace mongo {
}
bob.append("total-live-recovery-units", RocksRecoveryUnit::getTotalLiveRecoveryUnits());
bob.append("block-cache-usage", PrettyPrintBytes(_engine->getBlockCacheUsage()));
+ bob.append("transaction-engine-keys",
+ static_cast<long long>(_engine->getTransactionEngine()->numKeysTracked()));
+ bob.append("transaction-engine-snapshots",
+ static_cast<long long>(_engine->getTransactionEngine()->numActiveSnapshots()));
return bob.obj();
}
diff --git a/src/mongo/db/storage/rocks/rocks_transaction.cpp b/src/mongo/db/storage/rocks/rocks_transaction.cpp
index 58e0bf3c6af..0e14fba0bba 100644
--- a/src/mongo/db/storage/rocks/rocks_transaction.cpp
+++ b/src/mongo/db/storage/rocks/rocks_transaction.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/storage/rocks/rocks_transaction.h"
+#include "rocks_transaction.h"
#include <atomic>
#include <map>
@@ -41,6 +41,15 @@
namespace mongo {
RocksTransactionEngine::RocksTransactionEngine() : _latestSnapshotId(1), _nextTransactionId(1) {}
+ size_t RocksTransactionEngine::numKeysTracked() {
+ boost::mutex::scoped_lock lk(_lock);
+ return _keyInfo.size();
+ }
+ size_t RocksTransactionEngine::numActiveSnapshots() {
+ boost::mutex::scoped_lock lk(_lock);
+ return _activeSnapshots.size();
+ }
+
std::list<uint64_t>::iterator RocksTransactionEngine::_getLatestSnapshotId_inlock() {
return _activeSnapshots.insert(_activeSnapshots.end(), _latestSnapshotId);
}
@@ -60,7 +69,7 @@ namespace mongo {
}
auto listIter = _keysSortedBySnapshot.insert(_keysSortedBySnapshot.end(), {key, newSnapshotId});
- _keyInfo.insert({key, {newSnapshotId, listIter}});
+ _keyInfo.insert({StringData(listIter->first), {newSnapshotId, listIter}});
}
void RocksTransactionEngine::_cleanUpKeysCommittedBeforeSnapshot_inlock(uint64_t snapshotId) {
diff --git a/src/mongo/db/storage/rocks/rocks_transaction.h b/src/mongo/db/storage/rocks/rocks_transaction.h
index 880677a1e3b..555c1324638 100644
--- a/src/mongo/db/storage/rocks/rocks_transaction.h
+++ b/src/mongo/db/storage/rocks/rocks_transaction.h
@@ -37,6 +37,8 @@
#include <boost/thread/mutex.hpp>
+#include "mongo/base/string_data.h"
+
namespace mongo {
class RocksTransaction;
@@ -44,6 +46,9 @@ namespace mongo {
public:
RocksTransactionEngine();
+ size_t numKeysTracked();
+ size_t numActiveSnapshots();
+
private:
// REQUIRES: transaction engine lock locked
std::list<uint64_t>::iterator _getLatestSnapshotId_inlock();
@@ -84,13 +89,13 @@ namespace mongo {
// * snapshot ID of the last commit to this key
// * an iterator pointing to the corresponding entry in _keysSortedBySnapshot. This is used
// to update the list at the same time as we update the _keyInfo
- // TODO optimize these structures to store only one key instead of two
typedef std::list<std::pair<std::string, uint64_t>> KeysSortedBySnapshotList;
typedef std::list<std::pair<std::string, uint64_t>>::iterator KeysSortedBySnapshotListIter;
KeysSortedBySnapshotList _keysSortedBySnapshot;
// map of key -> pair{seq_id, pointer to corresponding _keysSortedBySnapshot}
- std::unordered_map<std::string, std::pair<uint64_t, KeysSortedBySnapshotListIter>> _keyInfo;
-
+ // key is a StringData and it points to the actual string in _keysSortedBySnapshot
+ std::unordered_map<StringData, std::pair<uint64_t, KeysSortedBySnapshotListIter>,
+ StringData::Hasher> _keyInfo;
std::unordered_map<std::string, uint64_t> _uncommittedTransactionId;
// this list is sorted
diff --git a/src/mongo/db/storage/rocks/rocks_util.cpp b/src/mongo/db/storage/rocks/rocks_util.cpp
new file mode 100644
index 00000000000..c31b362a083
--- /dev/null
+++ b/src/mongo/db/storage/rocks/rocks_util.cpp
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#include "rocks_util.h"
+
+#include <string>
+#include <rocksdb/status.h>
+
+namespace mongo {
+
+ Status rocksToMongoStatus_slow(const rocksdb::Status& status, const char* prefix) {
+ if (status.ok()) {
+ return Status::OK();
+ }
+
+ if (status.IsCorruption()) {
+ return Status(ErrorCodes::BadValue, status.ToString());
+ }
+
+ return Status(ErrorCodes::InternalError, status.ToString());
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/rocks/rocks_util.h b/src/mongo/db/storage/rocks/rocks_util.h
index f8611d60d84..98b927c5610 100644
--- a/src/mongo/db/storage/rocks/rocks_util.h
+++ b/src/mongo/db/storage/rocks/rocks_util.h
@@ -29,6 +29,9 @@
#pragma once
#include <string>
+#include <rocksdb/status.h>
+
+#include "mongo/util/assert_util.h"
namespace mongo {
@@ -45,4 +48,24 @@ namespace mongo {
return nextPrefix;
}
+ Status rocksToMongoStatus_slow(const rocksdb::Status& status, const char* prefix);
+
+ /**
+ * converts rocksdb status to mongodb status
+ */
+ inline Status rocksToMongoStatus(const rocksdb::Status& status, const char* prefix = NULL) {
+ if (MONGO_likely(status.ok())) {
+ return Status::OK();
+ }
+ return rocksToMongoStatus_slow(status, prefix);
+ }
+
+#define invariantRocksOK(expression) do { \
+ auto _invariantRocksOK_status = expression; \
+ if (MONGO_unlikely(!_invariantRocksOK_status.ok())) { \
+ invariantOKFailed(#expression, rocksToMongoStatus(_invariantRocksOK_status), \
+ __FILE__, __LINE__); \
+ } \
+ } while (false)
+
} // namespace mongo