diff options
author | Igor Canadi <icanadi@fb.com> | 2015-03-23 12:35:20 -0700 |
---|---|---|
committer | Ramon Fernandez <ramon.fernandez@mongodb.com> | 2015-03-23 17:35:59 -0400 |
commit | 12e24d4e910150d2523a226ba5a62370dc974d44 (patch) | |
tree | 1f22d9f9b12fce1d3dde260c4706f4eb92845f4d | |
parent | 1b7ee0b6d045c5f99036507915f29684cee59b29 (diff) | |
download | mongo-12e24d4e910150d2523a226ba5a62370dc974d44.tar.gz |
SERVER-17706 Sync mongo-rocks with our recent changes
We've been developing these changes in a different repository. This
patch containes bunch of improvements and fixes:
* Implement updateStatsAfterRepair()
* Return correct ident size in getIdentSize() function
* Fast drop operation in Mongo+Rocks
* We should abort RocksTransaction when we do _releaseSnapshot()
* SERVER-16979 Correctly handle errors returned by RocksDB
* Implement StandardBulkBuilder and UniqueBulkBuilder
* Add new option to RocksDB -- rocksdbMaxWriteMBPerSec
* Make oplog cleanup more efficient (also changes disk format)
* Some configuration changes
Signed-off-by: Ramon Fernandez <ramon.fernandez@mongodb.com>
26 files changed, 975 insertions, 413 deletions
diff --git a/src/mongo/db/storage/rocks/KNOWN_ISSUES.md b/src/mongo/db/storage/rocks/KNOWN_ISSUES.md new file mode 100644 index 00000000000..d82b8dcb48d --- /dev/null +++ b/src/mongo/db/storage/rocks/KNOWN_ISSUES.md @@ -0,0 +1 @@ +* Don't create indexes on oplog. We will not properly unindex documents that we remove from the beginning of the oplog. diff --git a/src/mongo/db/storage/rocks/SConscript b/src/mongo/db/storage/rocks/SConscript index 8c43d94c303..2ba0f3149d3 100644 --- a/src/mongo/db/storage/rocks/SConscript +++ b/src/mongo/db/storage/rocks/SConscript @@ -12,6 +12,7 @@ if has_option("rocksdb"): 'rocks_recovery_unit.cpp', 'rocks_index.cpp', 'rocks_transaction.cpp', + 'rocks_util.cpp', ], LIBDEPS= [ '$BUILD_DIR/mongo/bson', @@ -36,6 +37,7 @@ if has_option("rocksdb"): source= [ 'rocks_init.cpp', 'rocks_options_init.cpp', + 'rocks_parameters.cpp', 'rocks_record_store_mongod.cpp', 'rocks_server_status.cpp', ], diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp index 2636f67396b..e16447b99f0 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.cpp +++ b/src/mongo/db/storage/rocks/rocks_engine.cpp @@ -31,7 +31,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/storage/rocks/rocks_engine.h" +#include "rocks_engine.h" #include <boost/filesystem/operations.hpp> #include <boost/make_shared.hpp> @@ -39,10 +39,12 @@ #include <boost/scoped_ptr.hpp> #include <rocksdb/cache.h> +#include <rocksdb/compaction_filter.h> #include <rocksdb/comparator.h> #include <rocksdb/db.h> #include <rocksdb/slice.h> #include <rocksdb/options.h> +#include <rocksdb/rate_limiter.h> #include <rocksdb/table.h> #include <rocksdb/utilities/convenience.h> #include <rocksdb/filter_policy.h> @@ -50,19 +52,20 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/index/index_descriptor.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" -#include "mongo/db/storage/rocks/rocks_global_options.h" -#include "mongo/db/storage/rocks/rocks_record_store.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" -#include "mongo/db/storage/rocks/rocks_index.h" +#include "mongo/db/server_parameters.h" #include "mongo/platform/endian.h" #include "mongo/util/log.h" #include "mongo/util/processinfo.h" -#define ROCKS_TRACE log() +#include "rocks_global_options.h" +#include "rocks_record_store.h" +#include "rocks_recovery_unit.h" +#include "rocks_index.h" +#include "rocks_util.h" -#define ROCKS_STATUS_OK( s ) if ( !( s ).ok() ) { error() << "rocks error: " << ( s ).ToString(); \ - invariant( false ); } +#define ROCKS_TRACE log() namespace mongo { @@ -81,10 +84,70 @@ namespace mongo { uint32_t bigEndianPrefix = endian::nativeToBig(prefix); return std::string(reinterpret_cast<const char*>(&bigEndianPrefix), sizeof(uint32_t)); } + + class PrefixDeletingCompactionFilter : public rocksdb::CompactionFilter { + public: + explicit PrefixDeletingCompactionFilter(std::unordered_set<uint32_t> droppedPrefixes) + : _droppedPrefixes(std::move(droppedPrefixes)), + _prefixCache(0), + _droppedCache(false) {} + + // filter is not called from multiple threads simultaneously + virtual bool Filter(int level, const rocksdb::Slice& key, + const rocksdb::Slice& existing_value, std::string* new_value, + bool* value_changed) const { + uint32_t prefix = 0; + if (!extractPrefix(key, &prefix)) { + // this means there is a key in the database that's shorter than 4 bytes. this + // should never happen and this is a corruption. however, it's not compaction + // filter's job to report corruption, so we just silently continue + return false; + } + if (prefix == _prefixCache) { + return _droppedCache; + } + _prefixCache = prefix; + _droppedCache = _droppedPrefixes.find(prefix) != _droppedPrefixes.end(); + return _droppedCache; + } + + virtual const char* Name() const { return "PrefixDeletingCompactionFilter"; } + + private: + std::unordered_set<uint32_t> _droppedPrefixes; + mutable uint32_t _prefixCache; + mutable bool _droppedCache; + }; + + class PrefixDeletingCompactionFilterFactory : public rocksdb::CompactionFilterFactory { + public: + explicit PrefixDeletingCompactionFilterFactory(const RocksEngine* engine) : _engine(engine) {} + + virtual std::unique_ptr<rocksdb::CompactionFilter> CreateCompactionFilter( + const rocksdb::CompactionFilter::Context& context) override { + auto droppedPrefixes = _engine->getDroppedPrefixes(); + if (droppedPrefixes.size() == 0) { + // no compaction filter needed + return std::unique_ptr<rocksdb::CompactionFilter>(nullptr); + } else { + return std::unique_ptr<rocksdb::CompactionFilter>( + new PrefixDeletingCompactionFilter(std::move(droppedPrefixes))); + } + } + + virtual const char* Name() const override { + return "PrefixDeletingCompactionFilterFactory"; + } + + private: + const RocksEngine* _engine; + }; + } // anonymous namespace // first four bytes are the default prefix 0 const std::string RocksEngine::kMetadataPrefix("\0\0\0\0metadata-", 12); + const std::string RocksEngine::kDroppedPrefix("\0\0\0\0droppedprefix-", 18); RocksEngine::RocksEngine(const std::string& path, bool durable) : _path(path), _durable(durable) { @@ -101,23 +164,26 @@ namespace mongo { cacheSizeGB = 1; } } - _block_cache = rocksdb::NewLRUCache(cacheSizeGB * 1024 * 1024 * 1024LL); + _block_cache = rocksdb::NewLRUCache(cacheSizeGB * 1024 * 1024 * 1024LL, 10); } + _maxWriteMBPerSec = rocksGlobalOptions.maxWriteMBPerSec; + _rateLimiter.reset( + rocksdb::NewGenericRateLimiter(static_cast<int64_t>(_maxWriteMBPerSec) * 1024 * 1024)); // open DB rocksdb::DB* db; auto s = rocksdb::DB::Open(_options(), path, &db); - ROCKS_STATUS_OK(s); + invariantRocksOK(s); _db.reset(db); // open iterator - boost::scoped_ptr<rocksdb::Iterator> _iter(_db->NewIterator(rocksdb::ReadOptions())); + boost::scoped_ptr<rocksdb::Iterator> iter(_db->NewIterator(rocksdb::ReadOptions())); // find maxPrefix _maxPrefix = 0; - _iter->SeekToLast(); - if (_iter->Valid()) { + iter->SeekToLast(); + if (iter->Valid()) { // otherwise the DB is empty, so we just keep it at 0 - bool ok = extractPrefix(_iter->key(), &_maxPrefix); + bool ok = extractPrefix(iter->key(), &_maxPrefix); // this is DB corruption here invariant(ok); } @@ -125,21 +191,59 @@ namespace mongo { // load ident to prefix map { boost::mutex::scoped_lock lk(_identPrefixMapMutex); - for (_iter->Seek(kMetadataPrefix); - _iter->Valid() && _iter->key().starts_with(kMetadataPrefix); _iter->Next()) { - rocksdb::Slice ident(_iter->key()); + for (iter->Seek(kMetadataPrefix); + iter->Valid() && iter->key().starts_with(kMetadataPrefix); iter->Next()) { + invariantRocksOK(iter->status()); + rocksdb::Slice ident(iter->key()); ident.remove_prefix(kMetadataPrefix.size()); // this could throw DBException, which then means DB corruption. We just let it fly // to the caller - BSONObj identConfig(_iter->value().data()); + BSONObj identConfig(iter->value().data()); BSONElement element = identConfig.getField("prefix"); - // TODO: SERVER-16979 Correctly handle errors returned by RocksDB - // This is DB corruption - invariant(!element.eoo() || !element.isNumber()); + + if (element.eoo() || !element.isNumber()) { + log() << "Mongo metadata in RocksDB database is corrupted."; + invariant(false); + } + uint32_t identPrefix = static_cast<uint32_t>(element.numberInt()); _identPrefixMap[StringData(ident.data(), ident.size())] = identPrefix; } } + + // load dropped prefixes + { + rocksdb::WriteBatch wb; + // we will use this iter to check if prefixes are still alive + boost::scoped_ptr<rocksdb::Iterator> prefixIter( + _db->NewIterator(rocksdb::ReadOptions())); + for (iter->Seek(kDroppedPrefix); + iter->Valid() && iter->key().starts_with(kDroppedPrefix); iter->Next()) { + invariantRocksOK(iter->status()); + rocksdb::Slice prefix(iter->key()); + prefix.remove_prefix(kDroppedPrefix.size()); + prefixIter->Seek(prefix); + invariantRocksOK(iter->status()); + if (prefixIter->Valid() && prefixIter->key().starts_with(prefix)) { + // prefix is still alive, let's instruct the compaction filter to clear it up + uint32_t int_prefix; + bool ok = extractPrefix(prefix, &int_prefix); + invariant(ok); + { + boost::mutex::scoped_lock lk(_droppedPrefixesMutex); + _droppedPrefixes.insert(int_prefix); + } + } else { + // prefix is no longer alive. let's remove the prefix from our dropped prefixes + // list + wb.Delete(iter->key()); + } + } + if (wb.Count() > 0) { + auto s = _db->Write(rocksdb::WriteOptions(), &wb); + invariantRocksOK(s); + } + } } RocksEngine::~RocksEngine() {} @@ -151,12 +255,24 @@ namespace mongo { Status RocksEngine::createRecordStore(OperationContext* opCtx, const StringData& ns, const StringData& ident, const CollectionOptions& options) { - return _createIdentPrefix(ident); + auto s = _createIdentPrefix(ident); + if (NamespaceString::oplog(ns)) { + _oplogIdent = ident.toString(); + // oplog needs two prefixes, so we also reserve the next one + { + boost::mutex::scoped_lock lk(_identPrefixMapMutex); + ++_maxPrefix; + } + } + return s; } RecordStore* RocksEngine::getRecordStore(OperationContext* opCtx, const StringData& ns, const StringData& ident, const CollectionOptions& options) { + if (NamespaceString::oplog(ns)) { + _oplogIdent = ident.toString(); + } if (options.capped) { return new RocksRecordStore( ns, ident, _db.get(), _getIdentPrefix(ident), true, @@ -184,30 +300,51 @@ namespace mongo { } } + // cannot be rolled back Status RocksEngine::dropIdent(OperationContext* opCtx, const StringData& ident) { - // TODO optimize this using CompactionFilterV2 rocksdb::WriteBatch wb; wb.Delete(kMetadataPrefix + ident.toString()); - std::string prefix = _getIdentPrefix(ident); - rocksdb::Slice prefixSlice(prefix.data(), prefix.size()); + // calculate which prefixes we need to drop + std::vector<std::string> prefixesToDrop; + prefixesToDrop.push_back(_getIdentPrefix(ident)); + if (_oplogIdent == ident.toString()) { + // if we're dropping oplog, we also need to drop keys from RocksOplogKeyTracker (they + // are stored at prefix+1) + prefixesToDrop.push_back(rocksGetNextPrefix(prefixesToDrop[0])); + } - boost::scoped_ptr<rocksdb::Iterator> _iter(_db->NewIterator(rocksdb::ReadOptions())); - for (_iter->Seek(prefixSlice); _iter->Valid() && _iter->key().starts_with(prefixSlice); - _iter->Next()) { - ROCKS_STATUS_OK(_iter->status()); - wb.Delete(_iter->key()); + // We record the fact that we're deleting this prefix. That way we ensure that the prefix is + // always deleted + for (const auto& prefix : prefixesToDrop) { + wb.Put(kDroppedPrefix + prefix, ""); } - auto s = _db->Write(rocksdb::WriteOptions(), &wb); + + // we need to make sure this is on disk before starting to delete data in compactions + rocksdb::WriteOptions syncOptions; + syncOptions.sync = true; + auto s = _db->Write(syncOptions, &wb); if (!s.ok()) { - return toMongoStatus(s); + return rocksToMongoStatus(s); } + // remove from map { boost::mutex::scoped_lock lk(_identPrefixMapMutex); _identPrefixMap.erase(ident); } + // instruct compaction filter to start deleting + { + boost::mutex::scoped_lock lk(_droppedPrefixesMutex); + for (const auto& prefix : prefixesToDrop) { + uint32_t int_prefix; + bool ok = extractPrefix(prefix, &int_prefix); + invariant(ok); + _droppedPrefixes.insert(int_prefix); + } + } + return Status::OK(); } @@ -224,6 +361,27 @@ namespace mongo { return indents; } + int64_t RocksEngine::getIdentSize(OperationContext* opCtx, const StringData& ident) { + uint64_t storageSize; + std::string prefix = _getIdentPrefix(ident); + std::string nextPrefix = std::move(rocksGetNextPrefix(prefix)); + rocksdb::Range wholeRange(prefix, nextPrefix); + _db->GetApproximateSizes(&wholeRange, 1, &storageSize); + return std::max(static_cast<int64_t>(storageSize), static_cast<int64_t>(1)); + } + + void RocksEngine::setMaxWriteMBPerSec(int maxWriteMBPerSec) { + _maxWriteMBPerSec = maxWriteMBPerSec; + _rateLimiter->SetBytesPerSecond(static_cast<int64_t>(_maxWriteMBPerSec) * 1024 * 1024); + } + + std::unordered_set<uint32_t> RocksEngine::getDroppedPrefixes() const { + boost::mutex::scoped_lock lk(_droppedPrefixesMutex); + // this will copy the set. that way compaction filter has its own copy and doesn't need to + // worry about thread safety + return _droppedPrefixes; + } + // non public api Status RocksEngine::_createIdentPrefix(const StringData& ident) { uint32_t prefix = 0; @@ -245,7 +403,7 @@ namespace mongo { auto s = _db->Put(rocksdb::WriteOptions(), kMetadataPrefix + ident.toString(), rocksdb::Slice(config.objdata(), config.objsize())); - return toMongoStatus(s); + return rocksToMongoStatus(s); } std::string RocksEngine::_getIdentPrefix(const StringData& ident) { @@ -258,21 +416,26 @@ namespace mongo { rocksdb::Options RocksEngine::_options() const { // default options rocksdb::Options options; + options.rate_limiter = _rateLimiter; rocksdb::BlockBasedTableOptions table_options; table_options.block_cache = _block_cache; - table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10)); + table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false)); + table_options.block_size = 16 * 1024; // 16KB table_options.format_version = 2; options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); - options.write_buffer_size = 128 * 1024 * 1024; // 128MB + options.write_buffer_size = 64 * 1024 * 1024; // 64MB options.max_write_buffer_number = 4; - options.max_background_compactions = 8; - options.max_background_flushes = 4; + options.max_background_compactions = 4; + options.max_background_flushes = 2; options.target_file_size_base = 64 * 1024 * 1024; // 64MB - options.soft_rate_limit = 2; + options.soft_rate_limit = 2.5; options.hard_rate_limit = 3; options.max_bytes_for_level_base = 512 * 1024 * 1024; // 512 MB - options.max_open_files = 20000; + // This means there is no limit on open files. Make sure to always set ulimit so that it can + // keep all RocksDB files opened. + options.max_open_files = -1; + options.compaction_filter_factory.reset(new PrefixDeletingCompactionFilterFactory(this)); if (rocksGlobalOptions.compression == "snappy") { options.compression = rocksdb::kSnappyCompression; @@ -296,11 +459,4 @@ namespace mongo { return options; } - - Status toMongoStatus( rocksdb::Status s ) { - if ( s.ok() ) - return Status::OK(); - else - return Status( ErrorCodes::InternalError, s.ToString() ); - } } diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h index 89a6b9804a6..f1a4d2f58da 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.h +++ b/src/mongo/db/storage/rocks/rocks_engine.h @@ -33,6 +33,7 @@ #include <map> #include <string> #include <memory> +#include <unordered_set> #include <boost/optional.hpp> #include <boost/scoped_ptr.hpp> @@ -40,14 +41,16 @@ #include <boost/thread/mutex.hpp> #include <rocksdb/cache.h> +#include <rocksdb/rate_limiter.h> #include <rocksdb/status.h> #include "mongo/base/disallow_copying.h" #include "mongo/bson/ordering.h" #include "mongo/db/storage/kv/kv_engine.h" -#include "mongo/db/storage/rocks/rocks_transaction.h" #include "mongo/util/string_map.h" +#include "rocks_transaction.h" + namespace rocksdb { class ColumnFamilyHandle; struct ColumnFamilyDescriptor; @@ -103,11 +106,7 @@ namespace mongo { virtual bool isDurable() const override { return _durable; } - virtual int64_t getIdentSize(OperationContext* opCtx, - const StringData& ident) { - // TODO: return correct size. - return 1; - } + virtual int64_t getIdentSize(OperationContext* opCtx, const StringData& ident); virtual Status repairIdent(OperationContext* opCtx, const StringData& ident) { @@ -129,6 +128,12 @@ namespace mongo { rocksdb::DB* getDB() { return _db.get(); } const rocksdb::DB* getDB() const { return _db.get(); } size_t getBlockCacheUsage() const { return _block_cache->GetUsage(); } + std::unordered_set<uint32_t> getDroppedPrefixes() const; + + RocksTransactionEngine* getTransactionEngine() { return &_transactionEngine; } + + int getMaxWriteMBPerSec() const { return _maxWriteMBPerSec; } + void setMaxWriteMBPerSec(int maxWriteMBPerSec); private: Status _createIdentPrefix(const StringData& ident); @@ -139,6 +144,8 @@ namespace mongo { std::string _path; boost::scoped_ptr<rocksdb::DB> _db; std::shared_ptr<rocksdb::Cache> _block_cache; + int _maxWriteMBPerSec; + std::shared_ptr<rocksdb::RateLimiter> _rateLimiter; const bool _durable; @@ -146,15 +153,20 @@ namespace mongo { mutable boost::mutex _identPrefixMapMutex; typedef StringMap<uint32_t> IdentPrefixMap; IdentPrefixMap _identPrefixMap; + std::string _oplogIdent; // protected by _identPrefixMapMutex uint32_t _maxPrefix; + // set of all prefixes that are deleted. we delete them in the background thread + mutable boost::mutex _droppedPrefixesMutex; + std::unordered_set<uint32_t> _droppedPrefixes; + // This is for concurrency control RocksTransactionEngine _transactionEngine; static const std::string kMetadataPrefix; + static const std::string kDroppedPrefix; }; - Status toMongoStatus( rocksdb::Status s ); } diff --git a/src/mongo/db/storage/rocks/rocks_engine_test.cpp b/src/mongo/db/storage/rocks/rocks_engine_test.cpp index 31f5322b9ab..9c8befc5890 100644 --- a/src/mongo/db/storage/rocks/rocks_engine_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_engine_test.cpp @@ -38,9 +38,10 @@ #include "mongo/db/storage/kv/kv_engine.h" #include "mongo/db/storage/kv/kv_engine_test_harness.h" -#include "mongo/db/storage/rocks/rocks_engine.h" #include "mongo/unittest/temp_dir.h" +#include "rocks_engine.h" + namespace mongo { class RocksEngineHarnessHelper : public KVHarnessHelper { public: diff --git a/src/mongo/db/storage/rocks/rocks_global_options.cpp b/src/mongo/db/storage/rocks/rocks_global_options.cpp index fa84cc17f40..264b17805f7 100644 --- a/src/mongo/db/storage/rocks/rocks_global_options.cpp +++ b/src/mongo/db/storage/rocks/rocks_global_options.cpp @@ -31,10 +31,11 @@ #include "mongo/platform/basic.h" #include "mongo/base/status.h" -#include "mongo/db/storage/rocks/rocks_global_options.h" #include "mongo/util/log.h" #include "mongo/util/options_parser/constraints.h" +#include "rocks_global_options.h" + namespace mongo { RocksGlobalOptions rocksGlobalOptions; @@ -52,6 +53,14 @@ namespace mongo { "[none|snappy|zlib]") .format("(:?none)|(:?snappy)|(:?zlib)", "(none/snappy/zlib)") .setDefault(moe::Value(std::string("snappy"))); + rocksOptions + .addOptionChaining( + "storage.rocksdb.maxWriteMBPerSec", "rocksdbMaxWriteMBPerSec", moe::Int, + "Maximum speed that RocksDB will write to storage. Reducing this can " + "help reduce read latency spikes during compactions. However, reducing this " + "below a certain point might slow down writes. Defaults to 1GB/sec") + .validRange(1, 1024) + .setDefault(moe::Value(1024)); rocksOptions.addOptionChaining("storage.rocksdb.configString", "rocksdbConfigString", moe::String, "RocksDB storage engine custom " @@ -71,6 +80,11 @@ namespace mongo { params["storage.rocksdb.compression"].as<std::string>(); log() << "Compression: " << rocksGlobalOptions.compression; } + if (params.count("storage.rocksdb.maxWriteMBPerSec")) { + rocksGlobalOptions.maxWriteMBPerSec = + params["storage.rocksdb.maxWriteMBPerSec"].as<int>(); + log() << "MaxWriteMBPerSec: " << rocksGlobalOptions.maxWriteMBPerSec; + } if (params.count("storage.rocksdb.configString")) { rocksGlobalOptions.configString = params["storage.rocksdb.configString"].as<std::string>(); diff --git a/src/mongo/db/storage/rocks/rocks_global_options.h b/src/mongo/db/storage/rocks/rocks_global_options.h index 9a9e1b28d1f..aa8d103df33 100644 --- a/src/mongo/db/storage/rocks/rocks_global_options.h +++ b/src/mongo/db/storage/rocks/rocks_global_options.h @@ -37,12 +37,13 @@ namespace mongo { class RocksGlobalOptions { public: - RocksGlobalOptions() : cacheSizeGB(0) {} + RocksGlobalOptions() : cacheSizeGB(0), maxWriteMBPerSec(1024), compression("snappy") {} Status add(moe::OptionSection* options); Status store(const moe::Environment& params, const std::vector<std::string>& args); size_t cacheSizeGB; + int maxWriteMBPerSec; std::string compression; std::string configString; diff --git a/src/mongo/db/storage/rocks/rocks_index.cpp b/src/mongo/db/storage/rocks/rocks_index.cpp index a9b40edbf9d..e08b6395c9c 100644 --- a/src/mongo/db/storage/rocks/rocks_index.cpp +++ b/src/mongo/db/storage/rocks/rocks_index.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/storage/rocks/rocks_index.h" +#include "rocks_index.h" #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -47,13 +47,14 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/storage/index_entry_comparison.h" -#include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/rocks/rocks_record_store.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" -#include "mongo/db/storage/rocks/rocks_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "rocks_engine.h" +#include "rocks_record_store.h" +#include "rocks_recovery_unit.h" +#include "rocks_util.h" + namespace mongo { using boost::scoped_ptr; @@ -103,10 +104,15 @@ namespace mongo { public: RocksCursorBase(OperationContext* txn, rocksdb::DB* db, std::string prefix, bool forward, Ordering order) - : _db(db), _prefix(prefix), _forward(forward), _order(order), _isKeyCurrent(false) { + : _db(db), + _prefix(prefix), + _forward(forward), + _order(order), + _locateCacheValid(false) { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); _iterator.reset(ru->NewIterator(_prefix)); - checkStatus(); + _currentSequenceNumber = ru->snapshot()->GetSequenceNumber(); + invariantRocksOK(_iterator->status()); } int getDirection() const { return _forward ? 1 : -1; } @@ -131,16 +137,41 @@ namespace mongo { // even if keys are equal, record IDs might be different (for unique indexes, since // in non-unique indexes RecordID is already encoded in the key) - return getRecordId() == other.getRecordId(); + return _loc == other._loc; + } + + virtual void advance() { + // Advance on a cursor at the end is a no-op + if (isEOF()) { + return; + } + advanceCursor(); + updatePosition(); } bool locate(const BSONObj& key, const RecordId& loc) { const BSONObj finalKey = stripFieldNames(key); - fillKey(finalKey, loc); - bool result = _locate(loc); + + if (_locateCacheValid == true && finalKey == _locateCacheKey && + loc == _locateCacheRecordId) { + // exact same call to locate() + return _locateCacheResult; + } + + fillQuery(finalKey, loc, &_query); + bool result = _locate(_query, loc); + updatePosition(); // An explicit search at the start of the range should always return false if (loc == RecordId::min() || loc == RecordId::max()) { - return false; + result = false; + } + + { + // memoization + _locateCacheKey = finalKey.getOwned(); + _locateCacheRecordId = loc; + _locateCacheResult = result; + _locateCacheValid = true; } return result; } @@ -160,8 +191,9 @@ namespace mongo { keyEndInclusive, getDirection() ); - fillKey(key, RecordId()); - _locate(RecordId()); + fillQuery(key, RecordId(), &_query); + _locate(_query, RecordId()); + updatePosition(); } /** @@ -178,65 +210,69 @@ namespace mongo { } BSONObj getKey() const { - if (_isKeyCurrent && !_keyBson.isEmpty()) { - return _keyBson; + if (isEOF()) { + return BSONObj(); + } + + if (!_keyBsonCache.isEmpty()) { + return _keyBsonCache; } - loadKeyIfNeeded(); - _keyBson = - KeyString::toBson(_key.getBuffer(), _key.getSize(), _order, getTypeBits()); - return _keyBson; + + _keyBsonCache = + KeyString::toBson(_key.getBuffer(), _key.getSize(), _order, _typeBits); + + return _keyBsonCache; } + RecordId getRecordId() const { return _loc; } + void savePosition() { _savedEOF = isEOF(); - - if (!_savedEOF) { - loadKeyIfNeeded(); - _savedRecordId = getRecordId(); - } - _iterator.reset(); } void restorePosition(OperationContext* txn) { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - _iterator.reset(ru->NewIterator(_prefix)); - - if (!_savedEOF) { - _locate(_savedRecordId); + if (_currentSequenceNumber != ru->snapshot()->GetSequenceNumber()) { + _iterator.reset(ru->NewIterator(_prefix)); + invariantRocksOK(_iterator->status()); + _currentSequenceNumber = ru->snapshot()->GetSequenceNumber(); + + if (!_savedEOF) { + _locate(_key, _loc); + updatePosition(); + } } } protected: // Uses _key for the key. Implemented by unique and standard index - virtual bool _locate(RecordId loc) = 0; + virtual bool _locate(const KeyString& query, RecordId loc) = 0; + + virtual void fillQuery(const BSONObj& key, RecordId loc, KeyString* query) const = 0; - // Must invalidateCache() - virtual void fillKey(const BSONObj& key, RecordId loc) = 0; + // Called after _key has been filled in. Must not throw WriteConflictException. + virtual void updateLocAndTypeBits() = 0; - virtual const KeyString::TypeBits& getTypeBits() const = 0; void advanceCursor() { - invalidateCache(); if (_forward) { _iterator->Next(); } else { _iterator->Prev(); } - checkStatus(); + invariantRocksOK(_iterator->status()); } - // Seeks to _key. Returns true on exact match. - bool seekCursor() { - invalidateCache(); - const rocksdb::Slice keySlice(_key.getBuffer(), _key.getSize()); + // Seeks to query. Returns true on exact match. + bool seekCursor(const KeyString& query) { + const rocksdb::Slice keySlice(query.getBuffer(), query.getSize()); _iterator->Seek(keySlice); - checkStatus(); if (!_iterator->Valid()) { if (!_forward) { // this will give lower bound behavior for backwards _iterator->SeekToLast(); - checkStatus(); } + invariantRocksOK(_iterator->status()); return false; } @@ -252,32 +288,26 @@ namespace mongo { // were // searching for. _iterator->Prev(); + invariantRocksOK(_iterator->status()); } return false; } - void loadKeyIfNeeded() const { - if (_isKeyCurrent) { + void updatePosition() { + if (isEOF()) { + _loc = RecordId(); return; } auto key = _iterator->key(); _key.resetFromBuffer(key.data(), key.size()); - _isKeyCurrent = true; - } + _keyBsonCache = BSONObj(); // Invalidate cached BSONObj. - virtual void invalidateCache() { - _isKeyCurrent = false; - _keyBson = BSONObj(); - } + _locateCacheValid = false; // Invalidate locate cache + _locateCacheKey = BSONObj(); // Invalidate locate cache - void checkStatus() { - if ( !_iterator->status().ok() ) { - log() << _iterator->status().ToString(); - // TODO: SERVER-16979 Correctly handle errors returned by RocksDB - invariant( false ); - } + updateLocAndTypeBits(); } rocksdb::DB* _db; // not owned @@ -289,26 +319,30 @@ namespace mongo { // These are for storing savePosition/restorePosition state bool _savedEOF; RecordId _savedRecordId; + rocksdb::SequenceNumber _currentSequenceNumber; + + KeyString _key; + KeyString::TypeBits _typeBits; + RecordId _loc; + mutable BSONObj _keyBsonCache; // if isEmpty, cache invalid and must be loaded from + // _key. - // These are all lazily loaded caches. - mutable BSONObj _keyBson; // if isEmpty, it is invalid and must be loaded from _key. - mutable bool _isKeyCurrent; // true if _key matches where the cursor is pointing - mutable KeyString _key; + KeyString _query; + + // These are for caching repeated calls to locate() + bool _locateCacheValid; + BSONObj _locateCacheKey; + RecordId _locateCacheRecordId; + bool _locateCacheResult; }; class RocksStandardCursor : public RocksCursorBase { public: RocksStandardCursor(OperationContext* txn, rocksdb::DB* db, std::string prefix, bool forward, Ordering order) - : RocksCursorBase(txn, db, prefix, forward, order), _isTypeBitsValid(false) {} - - virtual void invalidateCache() { - RocksCursorBase::invalidateCache(); - _loc = RecordId(); - _isTypeBitsValid = false; - } + : RocksCursorBase(txn, db, prefix, forward, order) {} - virtual void fillKey(const BSONObj& key, RecordId loc) { + virtual void fillQuery(const BSONObj& key, RecordId loc, KeyString* query) const { // Null cursors should start at the zero key to maintain search ordering in the // collator. // Reverse cursors should start on the last matching key. @@ -316,185 +350,180 @@ namespace mongo { loc = _forward ? RecordId::min() : RecordId::max(); } - _key.resetToKey(key, _order, loc); - invalidateCache(); - } - virtual bool _locate(RecordId loc) { - // loc already encoded in _key - return seekCursor(); + query->resetToKey(key, _order, loc); } - virtual RecordId getRecordId() const { - if (isEOF()) { - return RecordId(); - } - - if (_loc.isNull()) { - loadKeyIfNeeded(); - _loc = KeyString::decodeRecordIdAtEnd(_key.getBuffer(), _key.getSize()); - } - - dassert(!_loc.isNull()); - return _loc; - } - - virtual void advance() { - // Advance on a cursor at the end is a no-op - if (isEOF()) { - return; - } - advanceCursor(); + virtual bool _locate(const KeyString& query, RecordId loc) { + // loc already encoded in _key + return seekCursor(query); } - virtual const KeyString::TypeBits& getTypeBits() const { - if (!_isTypeBitsValid) { - auto value = _iterator->value(); - BufReader br(value.data(), value.size()); - _typeBits.resetFromBuffer(&br); - _isTypeBitsValid = true; - } - - return _typeBits; + virtual void updateLocAndTypeBits() { + _loc = KeyString::decodeRecordIdAtEnd(_key.getBuffer(), _key.getSize()); + auto value = _iterator->value(); + BufReader br(value.data(), value.size()); + _typeBits.resetFromBuffer(&br); } - - private: - mutable RecordId _loc; - - mutable bool _isTypeBitsValid; - mutable KeyString::TypeBits _typeBits; }; class RocksUniqueCursor : public RocksCursorBase { public: RocksUniqueCursor(OperationContext* txn, rocksdb::DB* db, std::string prefix, bool forward, Ordering order) - : RocksCursorBase(txn, db, prefix, forward, order), _recordsIndex(0) {} + : RocksCursorBase(txn, db, prefix, forward, order) {} - virtual void invalidateCache() { - RocksCursorBase::invalidateCache(); - _records.clear(); + virtual void fillQuery(const BSONObj& key, RecordId loc, KeyString* query) const { + query->resetToKey(key, _order); // loc doesn't go in _query for unique indexes } - virtual void fillKey(const BSONObj& key, RecordId loc) { - invalidateCache(); - _key.resetToKey(key, _order); // loc doesn't go in _key for unique indexes - } - - virtual bool _locate(RecordId loc) { - if (!seekCursor()) { + virtual bool _locate(const KeyString& query, RecordId loc) { + if (!seekCursor(query)) { // If didn't seek to exact key, start at beginning of wherever we ended up. return false; } dassert(!isEOF()); - if (loc.isNull()) { - // Null loc means means start and beginning or end of array as needed. - // so nothing to do - return true; - } + // If we get here we need to look at the actual RecordId for this key and make sure + // we are supposed to see it. - // If we get here we need to make sure we are positioned at the correct point of the - // _records vector. - if (_forward) { - while (getRecordId() < loc) { - _recordsIndex++; - if (_recordsIndex == _records.size()) { - // This means we exhausted the scan and didn't find a record in range. - advanceCursor(); - return false; - } - } - } else { - while (getRecordId() > loc) { - _recordsIndex++; - if (_recordsIndex == _records.size()) { - advanceCursor(); - return false; - } - } + auto value = _iterator->value(); + BufReader br(value.data(), value.size()); + RecordId locInIndex = KeyString::decodeRecordId(&br); + + if ((_forward && (locInIndex < loc)) || (!_forward && (locInIndex > loc))) { + advanceCursor(); } return true; } - virtual RecordId getRecordId() const { - if (isEOF()) { - return RecordId(); - } + void updateLocAndTypeBits() { + // We assume that cursors can only ever see unique indexes in their "pristine" + // state, + // where no duplicates are possible. The cases where dups are allowed should hold + // sufficient locks to ensure that no cursor ever sees them. - loadValueIfNeeded(); - dassert(!_records[_recordsIndex].first.isNull()); - return _records[_recordsIndex].first; - } - - virtual void advance() { - // Advance on a cursor at the end is a no-op - if (isEOF()) { - return; - } + auto value = _iterator->value(); + BufReader br(value.data(), value.size()); + _loc = KeyString::decodeRecordId(&br); + _typeBits.resetFromBuffer(&br); - // We may just be advancing within the RecordIds for this key. - loadValueIfNeeded(); - _recordsIndex++; - if (_recordsIndex == _records.size()) { - advanceCursor(); + if (!br.atEof()) { + severe() << "Unique index cursor seeing multiple records for key " << getKey(); + fassertFailed(28609); } } + }; - virtual const KeyString::TypeBits& getTypeBits() const { - invariant(!isEOF()); - loadValueIfNeeded(); - return _records[_recordsIndex].second; - } + } // namespace - private: - void loadValueIfNeeded() const { - if (!_records.empty()) { - return; - } + /** + * Bulk builds a non-unique index. + */ + class RocksIndexBase::StandardBulkBuilder : public SortedDataBuilderInterface { + public: + StandardBulkBuilder(RocksStandardIndex* index, OperationContext* txn) : _index(index), _txn(txn) {} - _recordsIndex = 0; - auto value = _iterator->value(); - BufReader br(value.data(), value.size()); - while (br.remaining()) { - RecordId loc = KeyString::decodeRecordId(&br); - _records.push_back(std::make_pair(loc, KeyString::TypeBits::fromBuffer(&br))); - } - invariant(!_records.empty()); + Status addKey(const BSONObj& key, const RecordId& loc) { + return _index->insert(_txn, key, loc, true); + } - if (!_forward) { - std::reverse(_records.begin(), _records.end()); + void commit(bool mayInterrupt) { + WriteUnitOfWork uow(_txn); + uow.commit(); + } + + private: + RocksStandardIndex* _index; + OperationContext* _txn; + }; + + /** + * Bulk builds a unique index. + * + * In order to support unique indexes in dupsAllowed mode this class only does an actual insert + * after it sees a key after the one we are trying to insert. This allows us to gather up all + * duplicate locs and insert them all together. This is necessary since bulk cursors can only + * append data. + */ + class RocksIndexBase::UniqueBulkBuilder : public SortedDataBuilderInterface { + public: + UniqueBulkBuilder(std::string prefix, Ordering ordering, OperationContext* txn, bool dupsAllowed) + : _prefix(std::move(prefix)), _ordering(ordering), _txn(txn), _dupsAllowed(dupsAllowed) {} + + Status addKey(const BSONObj& newKey, const RecordId& loc) { + Status s = checkKeySize(newKey); + if (!s.isOK()) { + return s; + } + + const int cmp = newKey.woCompare(_key, _ordering); + if (cmp != 0) { + if (!_key.isEmpty()) { // _key.isEmpty() is only true on the first call to addKey(). + invariant(cmp > 0); // newKey must be > the last key + // We are done with dups of the last key so we can insert it now. + doInsert(); } + invariant(_records.empty()); } + else { + // Dup found! + if (!_dupsAllowed) { + return Status(ErrorCodes::DuplicateKey, dupKeyError(newKey)); + } - mutable size_t _recordsIndex; - mutable std::vector<std::pair<RecordId, KeyString::TypeBits> > _records; - }; - - // TODO optimize and create two implementations -- one for unique and one for standard index - class RocksIndexBulkBuilder : public SortedDataBuilderInterface { - public: - RocksIndexBulkBuilder(RocksIndexBase* index, OperationContext* txn, bool dupsAllowed) - : _index(index), _txn(txn), _dupsAllowed(dupsAllowed) { - invariant(index->isEmpty(txn)); + // If we get here, we are in the weird mode where dups are allowed on a unique + // index, so add ourselves to the list of duplicate locs. This also replaces the + // _key which is correct since any dups seen later are likely to be newer. } - Status addKey(const BSONObj& key, const RecordId& loc) { - return _index->insert(_txn, key, loc, _dupsAllowed); + _key = newKey.getOwned(); + _keyString.resetToKey(_key, _ordering); + _records.push_back(std::make_pair(loc, _keyString.getTypeBits())); + + return Status::OK(); + } + + void commit(bool mayInterrupt) { + WriteUnitOfWork uow(_txn); + if (!_records.empty()) { + // This handles inserting the last unique key. + doInsert(); } + uow.commit(); + } - void commit(bool mayInterrupt) { - WriteUnitOfWork uow(_txn); - uow.commit(); + private: + void doInsert() { + invariant(!_records.empty()); + + KeyString value; + for (size_t i = 0; i < _records.size(); i++) { + value.appendRecordId(_records[i].first); + // When there is only one record, we can omit AllZeros TypeBits. Otherwise they need + // to be included. + if (!(_records[i].second.isAllZeros() && _records.size() == 1)) { + value.appendTypeBits(_records[i].second); + } } - private: - RocksIndexBase* _index; - OperationContext* _txn; - bool _dupsAllowed; - }; + std::string prefixedKey(RocksIndexBase::_makePrefixedKey(_prefix, _keyString)); + rocksdb::Slice valueSlice(value.getBuffer(), value.getSize()); - } // namespace + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(_txn); + ru->writeBatch()->Put(prefixedKey, valueSlice); + + _records.clear(); + } + + std::string _prefix; + Ordering _ordering; + OperationContext* _txn; + const bool _dupsAllowed; + BSONObj _key; + KeyString _keyString; + std::vector<std::pair<RecordId, KeyString::TypeBits>> _records; + }; /// RocksIndexBase @@ -502,11 +531,6 @@ namespace mongo { Ordering order) : _db(db), _prefix(prefix), _ident(std::move(ident)), _order(order) {} - SortedDataBuilderInterface* RocksIndexBase::getBulkBuilder(OperationContext* txn, - bool dupsAllowed) { - return new RocksIndexBulkBuilder(this, txn, dupsAllowed); - } - void RocksIndexBase::fullValidate(OperationContext* txn, bool full, long long* numKeysOut, BSONObjBuilder* output) const { if (numKeysOut) { @@ -538,7 +562,9 @@ namespace mongo { std::string nextPrefix = std::move(rocksGetNextPrefix(_prefix)); rocksdb::Range wholeRange(_prefix, nextPrefix); _db->GetApproximateSizes(&wholeRange, 1, &storageSize); - return static_cast<long long>(storageSize); + // There might be some bytes in the WAL that we don't count here. Some + // tests depend on the fact that non-empty indexes have non-zero sizes + return static_cast<long long>(std::max(storageSize, static_cast<uint64_t>(1))); } std::string RocksIndexBase::_makePrefixedKey(const std::string& prefix, @@ -572,9 +598,7 @@ namespace mongo { std::string currentValue; auto getStatus = ru->Get(prefixedKey, ¤tValue); if (!getStatus.ok() && !getStatus.IsNotFound()) { - // This means that Get() returned an error - // TODO: SERVER-16979 Correctly handle errors returned by RocksDB - invariant(false); + return rocksToMongoStatus(getStatus); } else if (getStatus.IsNotFound()) { // nothing here. just insert the value KeyString value(loc); @@ -645,14 +669,11 @@ namespace mongo { // dups are allowed, so we have to deal with a vector of RecordIds. std::string currentValue; auto getStatus = ru->Get(prefixedKey, ¤tValue); - if (!getStatus.ok() && !getStatus.IsNotFound()) { - // This means that Get() returned an error - // TODO: SERVER-16979 Correctly handle errors returned by RocksDB - invariant(false); - } else if (getStatus.IsNotFound()) { + if (getStatus.IsNotFound()) { // nothing here. just return return; } + invariantRocksOK(getStatus); bool foundLoc = false; std::vector<std::pair<RecordId, KeyString::TypeBits>> records; @@ -713,9 +734,7 @@ namespace mongo { std::string value; auto getStatus = ru->Get(prefixedKey, &value); if (!getStatus.ok() && !getStatus.IsNotFound()) { - // This means that Get() returned an error - // TODO: SERVER-16979 Correctly handle errors returned by RocksDB - invariant(false); + return rocksToMongoStatus(getStatus); } else if (getStatus.IsNotFound()) { // not found, not duplicate key return Status::OK(); @@ -734,6 +753,11 @@ namespace mongo { return Status(ErrorCodes::DuplicateKey, dupKeyError(key)); } + SortedDataBuilderInterface* RocksUniqueIndex::getBulkBuilder(OperationContext* txn, + bool dupsAllowed) { + return new RocksIndexBase::UniqueBulkBuilder(_prefix, _order, txn, dupsAllowed); + } + /// RocksStandardIndex RocksStandardIndex::RocksStandardIndex(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order) @@ -783,5 +807,10 @@ namespace mongo { return new RocksStandardCursor(txn, _db, _prefix, direction == 1, _order); } + SortedDataBuilderInterface* RocksStandardIndex::getBulkBuilder(OperationContext* txn, + bool dupsAllowed) { + invariant(dupsAllowed); + return new RocksIndexBase::StandardBulkBuilder(this, txn); + } } // namespace mongo diff --git a/src/mongo/db/storage/rocks/rocks_index.h b/src/mongo/db/storage/rocks/rocks_index.h index 3c91185aac2..b551a7987e0 100644 --- a/src/mongo/db/storage/rocks/rocks_index.h +++ b/src/mongo/db/storage/rocks/rocks_index.h @@ -53,14 +53,15 @@ namespace mongo { public: RocksIndexBase(rocksdb::DB* db, std::string prefix, std::string ident, Ordering order); - virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed); + virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, + bool dupsAllowed) = 0; virtual void fullValidate(OperationContext* txn, bool full, long long* numKeysOut, BSONObjBuilder* output) const; virtual bool appendCustomStats(OperationContext* txn, BSONObjBuilder* output, double scale) const { - // TODO + // nothing to say here, really return false; } @@ -81,6 +82,10 @@ namespace mongo { // used to construct RocksCursors const Ordering _order; + + class StandardBulkBuilder; + class UniqueBulkBuilder; + friend class UniqueBulkBuilder; }; class RocksUniqueIndex : public RocksIndexBase { @@ -94,6 +99,9 @@ namespace mongo { virtual SortedDataInterface::Cursor* newCursor(OperationContext* txn, int direction) const; virtual Status dupKeyCheck(OperationContext* txn, const BSONObj& key, const RecordId& loc); + + virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, + bool dupsAllowed) override; }; class RocksStandardIndex : public RocksIndexBase { @@ -110,6 +118,9 @@ namespace mongo { // dupKeyCheck shouldn't be called for non-unique indexes invariant(false); } + + virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, + bool dupsAllowed) override; }; } // namespace mongo diff --git a/src/mongo/db/storage/rocks/rocks_index_test.cpp b/src/mongo/db/storage/rocks/rocks_index_test.cpp index 43effa007b1..b6f865fbaf1 100644 --- a/src/mongo/db/storage/rocks/rocks_index_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_index_test.cpp @@ -40,13 +40,14 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/storage/sorted_data_interface_test_harness.h" -#include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/rocks/rocks_index.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" -#include "mongo/db/storage/rocks/rocks_transaction.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" +#include "rocks_engine.h" +#include "rocks_index.h" +#include "rocks_recovery_unit.h" +#include "rocks_transaction.h" + namespace mongo { using boost::scoped_ptr; diff --git a/src/mongo/db/storage/rocks/rocks_init.cpp b/src/mongo/db/storage/rocks/rocks_init.cpp index 1f30a87f526..18d8b9d08c4 100644 --- a/src/mongo/db/storage/rocks/rocks_init.cpp +++ b/src/mongo/db/storage/rocks/rocks_init.cpp @@ -29,9 +29,6 @@ #include "mongo/platform/basic.h" -#include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/rocks/rocks_server_status.h" - #include "mongo/base/init.h" #include "mongo/db/global_environment_experiment.h" #include "mongo/db/storage_options.h" @@ -39,10 +36,15 @@ #include "mongo/db/storage/storage_engine_metadata.h" #include "mongo/util/mongoutils/str.h" +#include "rocks_engine.h" +#include "rocks_server_status.h" +#include "rocks_parameters.h" + namespace mongo { - const std::string kRocksDBEngineName = "RocksDB"; + const std::string kRocksDBEngineName = "rocksdb"; namespace { + class RocksFactory : public StorageEngine::Factory { public: virtual ~RocksFactory(){} @@ -56,6 +58,7 @@ namespace mongo { auto engine = new RocksEngine(params.dbpath + "/db", params.dur); // Intentionally leaked. auto leaked __attribute__((unused)) = new RocksServerStatusSection(engine); + auto leaked2 __attribute__((unused)) = new RocksRateLimiterServerParameter(engine); return new KVStorageEngine(engine, options); } @@ -106,7 +109,11 @@ namespace mongo { // and mongorestore. // * Version 1 was the format with many column families -- one column family for each // collection and index - // * Version 2 (current) keeps all collections and indexes in a single column family + // * Version 2 keeps all collections and indexes in a single column family + // * Version 3 (current) reserves two prefixes for oplog. one prefix keeps the oplog + // documents and another only keeps keys. That way, we can cleanup the oplog without + // reading full documents + // oplog cleanup const int kRocksFormatVersion = 2; const std::string kRocksFormatVersionString = "rocksFormatVersion"; }; diff --git a/src/mongo/db/storage/rocks/rocks_options_init.cpp b/src/mongo/db/storage/rocks/rocks_options_init.cpp index 366d4b3e374..4b5ce697d1b 100644 --- a/src/mongo/db/storage/rocks/rocks_options_init.cpp +++ b/src/mongo/db/storage/rocks/rocks_options_init.cpp @@ -26,12 +26,13 @@ * it in the license file. */ -#include "mongo/util/options_parser/startup_option_init.h" #include <iostream> +#include "mongo/util/options_parser/startup_option_init.h" #include "mongo/util/options_parser/startup_options.h" -#include "mongo/db/storage/rocks/rocks_global_options.h" + +#include "rocks_global_options.h" namespace mongo { diff --git a/src/mongo/db/storage/rocks/rocks_parameters.cpp b/src/mongo/db/storage/rocks/rocks_parameters.cpp new file mode 100644 index 00000000000..4181d8aab2f --- /dev/null +++ b/src/mongo/db/storage/rocks/rocks_parameters.cpp @@ -0,0 +1,73 @@ +/** +* Copyright (C) 2014 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include "rocks_parameters.h" + +#include "mongo/logger/parse_log_component_settings.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + RocksRateLimiterServerParameter::RocksRateLimiterServerParameter(RocksEngine* engine) + : ServerParameter(ServerParameterSet::getGlobal(), "rocksdbRuntimeConfigMaxWriteMBPerSec", + false, true), + _engine(engine) {} + + void RocksRateLimiterServerParameter::append(OperationContext* txn, BSONObjBuilder& b, + const std::string& name) { + b.append(name, _engine->getMaxWriteMBPerSec()); + } + + Status RocksRateLimiterServerParameter::set(const BSONElement& newValueElement) { + if (!newValueElement.isNumber()) { + return Status(ErrorCodes::BadValue, str::stream() << name() << " has to be a number"); + } + return _set(newValueElement.numberInt()); + } + + Status RocksRateLimiterServerParameter::setFromString(const std::string& str) { + int num = 0; + Status status = parseNumberFromString(str, &num); + if (!status.isOK()) return status; + return _set(num); + } + + Status RocksRateLimiterServerParameter::_set(int newNum) { + if (newNum <= 0) { + return Status(ErrorCodes::BadValue, str::stream() << name() << " has to be > 0"); + } + log() << "RocksDB: changing rate limiter to " << newNum << "MB/s"; + _engine->setMaxWriteMBPerSec(newNum); + + return Status::OK(); + } +} diff --git a/src/mongo/db/storage/rocks/rocks_parameters.h b/src/mongo/db/storage/rocks/rocks_parameters.h new file mode 100644 index 00000000000..b447a786386 --- /dev/null +++ b/src/mongo/db/storage/rocks/rocks_parameters.h @@ -0,0 +1,50 @@ +/** +* Copyright (C) 2014 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/server_parameters.h" + +#include "rocks_engine.h" + +namespace mongo { + + class RocksRateLimiterServerParameter : public ServerParameter { + MONGO_DISALLOW_COPYING(RocksRateLimiterServerParameter); + + public: + RocksRateLimiterServerParameter(RocksEngine* engine); + virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name); + virtual Status set(const BSONElement& newValueElement); + virtual Status setFromString(const std::string& str); + + private: + Status _set(int newNum); + RocksEngine* _engine; + }; + +} diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp index 443970b1226..810b7b08f79 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp @@ -31,7 +31,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/storage/rocks/rocks_record_store.h" +#include "rocks_record_store.h" #include <boost/scoped_array.hpp> #include <boost/shared_ptr.hpp> @@ -49,14 +49,17 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" -#include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" #include "mongo/db/storage/oplog_hack.h" #include "mongo/platform/endian.h" #include "mongo/util/background.h" #include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" #include "mongo/util/timer.h" +#include "rocks_engine.h" +#include "rocks_recovery_unit.h" +#include "rocks_util.h" + namespace mongo { using boost::shared_ptr; @@ -88,8 +91,7 @@ namespace mongo { void CappedVisibilityManager::_addUncommittedRecord_inlock(OperationContext* txn, const RecordId& record) { - // todo: make this a dassert at some point - invariant(_uncommittedRecords.empty() || _uncommittedRecords.back() < record); + dassert(_uncommittedRecords.empty() || _uncommittedRecords.back() < record); _uncommittedRecords.push_back(record); txn->recoveryUnit()->registerChange(new CappedInsertChange(this, record)); _oplog_highestSeen = record; @@ -137,6 +139,36 @@ namespace mongo { } } + // this object keeps track of keys in oplog. The format is this: + // <prefix>RecordId --> dataSize (small endian 32 bytes) + // <prefix> is oplog_prefix+1 (reserved by rocks_engine.cpp) + // That way we can cheaply delete old record in the oplog without actually reading oplog + // collection. + // All of the locking is done somewhere else -- we write exactly the same data as oplog, so we + // assume oplog already locked the relevant keys + class RocksOplogKeyTracker { + public: + RocksOplogKeyTracker(std::string prefix) : _prefix(std::move(prefix)) {} + void insertKey(RocksRecoveryUnit* ru, const RecordId& loc, int len) { + uint32_t lenLittleEndian = endian::nativeToLittle(static_cast<uint32_t>(len)); + ru->writeBatch()->Put(RocksRecordStore::_makePrefixedKey(_prefix, loc), + rocksdb::Slice(reinterpret_cast<const char*>(&lenLittleEndian), + sizeof(lenLittleEndian))); + } + void deleteKey(RocksRecoveryUnit* ru, const RecordId& loc) { + ru->writeBatch()->Delete(RocksRecordStore::_makePrefixedKey(_prefix, loc)); + } + rocksdb::Iterator* newIterator(RocksRecoveryUnit* ru) { return ru->NewIterator(_prefix); } + int decodeSize(const rocksdb::Slice& value) { + uint32_t size = + endian::littleToNative(*reinterpret_cast<const uint32_t*>(value.data())); + return static_cast<int>(size); + } + + private: + std::string _prefix; + }; + RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, // not owned here std::string prefix, bool isCapped, int64_t cappedMaxSize, @@ -152,7 +184,9 @@ namespace mongo { _cappedDeleteCallback(cappedDeleteCallback), _cappedDeleteCheckCount(0), _isOplog(NamespaceString::oplog(ns)), - _oplogCounter(0), + _oplogKeyTracker( + _isOplog ? new RocksOplogKeyTracker(std::move(rocksGetNextPrefix(_prefix))) : nullptr), + _oplogNextToDelete(0), _cappedVisibilityManager((_isCapped || _isOplog) ? new CappedVisibilityManager() : nullptr), _ident(id.toString()), @@ -199,12 +233,15 @@ namespace mongo { boost::timed_mutex::scoped_lock lk(_cappedDeleterMutex); _shuttingDown = true; } + delete _oplogKeyTracker; } int64_t RocksRecordStore::storageSize(OperationContext* txn, BSONObjBuilder* extraInfo, int infoLevel) const { - // we're lying, but that's the best we can do for now - return _dataSize.load(); + // We need to make it multiple of 256 to make + // jstests/concurrency/fsm_workloads/convert_to_capped_collection.js happy + return static_cast<int64_t>( + std::max(_dataSize.load() & (~255), static_cast<long long>(256))); } RecordData RocksRecordStore::dataFor(OperationContext* txn, const RecordId& loc) const { @@ -222,15 +259,25 @@ namespace mongo { } std::string oldValue; - ru->Get(key, &oldValue); + auto status = ru->Get(key, &oldValue); + invariantRocksOK(status); int oldLength = oldValue.size(); ru->writeBatch()->Delete(key); + if (_isOplog) { + _oplogKeyTracker->deleteKey(ru, dl); + } _changeNumRecords(txn, -1); _increaseDataSize(txn, -oldLength); } + long long RocksRecordStore::dataSize(OperationContext* txn) const { + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + return _dataSize.load(std::memory_order::memory_order_relaxed) + + ru->getDeltaCounter(_dataSizeKey); + } + long long RocksRecordStore::numRecords(OperationContext* txn) const { RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); return _numRecords.load(std::memory_order::memory_order_relaxed) + @@ -327,19 +374,37 @@ namespace mongo { if (_cappedMaxDocs != -1 && numRecords > _cappedMaxDocs) { docsOverCap = numRecords - _cappedMaxDocs; } + BSONObj emptyBson; try { WriteUnitOfWork wuow(txn); auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); - boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix)); - iter->SeekToFirst(); + boost::scoped_ptr<rocksdb::Iterator> iter; + if (_isOplog) { + // we're using _oplogKeyTracker to find which keys to delete -- this is much faster + // because we don't need to read any values. We theoretically need values to pass + // the document to the cappedDeleteCallback, but the callback is only using + // documents to remove them from indexes. opLog doesn't have indexes, so there + // should be no need for us to reconstruct the document to pass it to the callback + iter.reset(_oplogKeyTracker->newIterator(ru)); + int64_t storage; + iter->Seek(RocksRecordStore::_makeKey(_oplogNextToDelete, &storage)); + } else { + iter.reset(ru->NewIterator(_prefix)); + iter->SeekToFirst(); + } RecordId newestOld; while ((sizeSaved < sizeOverCap || docsRemoved < docsOverCap) && (docsRemoved < 20000) && iter->Valid()) { - rocksdb::Slice slice = iter->key(); - newestOld = _makeRecordId(slice); + newestOld = _makeRecordId(iter->key()); + + if (_cappedVisibilityManager->isCappedHidden(newestOld)) { + // this means we have an older record that hasn't been committed yet. let's + // wait until it gets committed before deleting + break; + } // don't go past the record we just inserted if (newestOld >= justInserted) { @@ -356,9 +421,17 @@ namespace mongo { break; } - auto oldValue = iter->value(); + rocksdb::Slice oldValue; ++docsRemoved; - sizeSaved += oldValue.size(); + if (_isOplog) { + // trick the callback by giving it empty bson document + oldValue = rocksdb::Slice(emptyBson.objdata(), emptyBson.objsize()); + // we keep data size in the value + sizeSaved += _oplogKeyTracker->decodeSize(iter->value()); + } else { + oldValue = iter->value(); + sizeSaved += oldValue.size(); + } if (_cappedDeleteCallback) { uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped( @@ -367,14 +440,34 @@ namespace mongo { } ru->writeBatch()->Delete(key); + if (_isOplog) { + _oplogKeyTracker->deleteKey(ru, newestOld); + } + iter->Next(); } + if (!iter->status().ok()) { + log() << "RocksDB iterator failure when trying to delete capped, ignoring: " + << iter->status().ToString(); + } + if (docsRemoved > 0) { _changeNumRecords(txn, -docsRemoved); _increaseDataSize(txn, -sizeSaved); wuow.commit(); } + + if (_isOplog && iter->Valid()) { + auto oldestAliveRecordId = _makeRecordId(iter->key()); + // we check if there's outstanding transaction that is older than + // oldestAliveRecordId. If there is, we should not skip deleting that record next + // time we clean up oplog. If there isn't, we know for certain this is the record + // we'll start out deletions from next time + if (!_cappedVisibilityManager->isCappedHidden(oldestAliveRecordId)) { + _oplogNextToDelete = oldestAliveRecordId; + } + } } catch ( const WriteConflictException& wce ) { delete txn->releaseRecoveryUnit(); @@ -391,7 +484,6 @@ namespace mongo { delete txn->releaseRecoveryUnit(); txn->setRecoveryUnit( realRecoveryUnit ); return docsRemoved; - } StatusWith<RecordId> RocksRecordStore::insertRecord( OperationContext* txn, @@ -424,6 +516,9 @@ namespace mongo { // No need to register the write here, since we just allocated a new RecordId so no other // transaction can access this key before we commit ru->writeBatch()->Put(_makePrefixedKey(_prefix, loc), rocksdb::Slice(data, len)); + if (_isOplog) { + _oplogKeyTracker->insertKey(ru, loc, len); + } _changeNumRecords( txn, 1 ); _increaseDataSize( txn, len ); @@ -458,14 +553,14 @@ namespace mongo { std::string old_value; auto status = ru->Get(key, &old_value); - - if ( !status.ok() ) { - return StatusWith<RecordId>( ErrorCodes::InternalError, status.ToString() ); - } + invariantRocksOK(status); int old_length = old_value.size(); ru->writeBatch()->Put(key, rocksdb::Slice(data, len)); + if (_isOplog) { + _oplogKeyTracker->insertKey(ru, loc, len); + } _increaseDataSize(txn, len - old_length); @@ -507,8 +602,6 @@ namespace mongo { } Status RocksRecordStore::truncate( OperationContext* txn ) { - // XXX once we have readable WriteBatch, also delete outstanding writes to - // this collection in the WriteBatch boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) ); while( !iter->isEOF() ) { RecordId loc = iter->getNext(); @@ -526,11 +619,7 @@ namespace mongo { std::string endString(_makePrefixedKey(_prefix, RecordId::max())); rocksdb::Slice beginRange(beginString); rocksdb::Slice endRange(endString); - rocksdb::Status status = _db->CompactRange(&beginRange, &endRange); - if ( status.ok() ) - return Status::OK(); - else - return Status( ErrorCodes::InternalError, status.ToString() ); + return rocksToMongoStatus(_db->CompactRange(&beginRange, &endRange)); } Status RocksRecordStore::validate( OperationContext* txn, @@ -539,32 +628,47 @@ namespace mongo { ValidateAdaptor* adaptor, ValidateResults* results, BSONObjBuilder* output ) { - // TODO validate that _numRecords and _dataSize are correct in scanData mode - if ( scanData ) { - bool invalidObject = false; - size_t numRecords = 0; - boost::scoped_ptr<RecordIterator> iter( getIterator( txn ) ); - while( !iter->isEOF() ) { - numRecords++; + long long nrecords = 0; + long long dataSizeTotal = 0; + if (scanData) { + boost::scoped_ptr<RecordIterator> iter(getIterator(txn)); + results->valid = true; + while (!iter->isEOF()) { + ++nrecords; if (full) { - RecordData data = dataFor(txn, iter->curr()); size_t dataSize; - const Status status = adaptor->validate(data, &dataSize); + RecordId loc = iter->curr(); + RecordData data = dataFor(txn, loc); + Status status = adaptor->validate(data, &dataSize); if (!status.isOK()) { results->valid = false; - if (invalidObject) { - results->errors.push_back("invalid object detected (see logs)"); - } - invalidObject = true; - log() << "Invalid object detected in " << _ns << ": " << status.reason(); + results->errors.push_back(str::stream() << loc << " is corrupted"); } + dataSizeTotal += static_cast<long long>(dataSize); } iter->getNext(); } - output->appendNumber("nrecords", numRecords); - } - else + + if (full && results->valid) { + long long storedNumRecords = numRecords(txn); + long long storedDataSize = dataSize(txn); + + if (nrecords != storedNumRecords || dataSizeTotal != storedDataSize) { + warning() << _ident << ": Existing record and data size counters (" + << storedNumRecords << " records " << storedDataSize << " bytes) " + << "are inconsistent with full validation results (" << nrecords + << " records " << dataSizeTotal << " bytes). " + << "Updating counters with new values."; + if (nrecords != storedNumRecords) { + _changeNumRecords(txn, nrecords - storedNumRecords); + _increaseDataSize(txn, dataSizeTotal - storedDataSize); + } + } + } + output->appendNumber("nrecords", nrecords); + } else { output->appendNumber("nrecords", numRecords(txn)); + } return Status::OK(); } @@ -590,6 +694,18 @@ namespace mongo { return record.getStatus(); } + void RocksRecordStore::updateStatsAfterRepair(OperationContext* txn, long long numRecords, + long long dataSize) { + _numRecords.store(numRecords); + _dataSize.store(dataSize); + rocksdb::WriteBatch wb; + int64_t storage; + wb.Put(_numRecordsKey, RocksRecoveryUnit::encodeCounter(numRecords, &storage)); + wb.Put(_dataSizeKey, RocksRecoveryUnit::encodeCounter(dataSize, &storage)); + auto s = _db->Write(rocksdb::WriteOptions(), &wb); + invariantRocksOK(s); + } + /** * Return the RecordId of an oplog entry as close to startingPosition as possible without * being higher. If there are no entries <= startingPosition, return RecordId(). @@ -604,7 +720,10 @@ namespace mongo { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); ru->setOplogReadTill(_cappedVisibilityManager->oplogStartHack()); - boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix)); + // we use _oplogKeyTracker, which contains exactly the same keys as oplog. the difference is + // that values are different (much smaller), so reading is faster. in this case, we only + // need keys (we never touch the values), so this works nicely + boost::scoped_ptr<rocksdb::Iterator> iter(_oplogKeyTracker->newIterator(ru)); int64_t storage; iter->Seek(_makeKey(startingPosition, &storage)); if (!iter->Valid()) { @@ -613,6 +732,7 @@ namespace mongo { // startingPosition is bigger than everything else return _makeRecordId(iter->key()); } else { + invariantRocksOK(iter->status()); // record store is empty return RecordId(); } @@ -631,6 +751,7 @@ namespace mongo { } if (!iter->Valid()) { + invariantRocksOK(iter->status()); // there are no entries <= startingPosition return RecordId(); } @@ -653,14 +774,6 @@ namespace mongo { wuow.commit(); } - rocksdb::ReadOptions RocksRecordStore::_readOptions(OperationContext* opCtx) { - rocksdb::ReadOptions options; - if ( opCtx ) { - options.snapshot = RocksRecoveryUnit::getRocksRecoveryUnit( opCtx )->snapshot(); - } - return options; - } - RecordId RocksRecordStore::_nextId() { invariant(!_isOplog); return RecordId(_nextIdNum.fetchAndAdd(1)); @@ -701,14 +814,10 @@ namespace mongo { std::string valueStorage; auto status = ru->Get(_makePrefixedKey(prefix, loc), &valueStorage); - if (!status.ok()) { - if (status.IsNotFound()) { - return RecordData(nullptr, 0); - } else { - log() << "rocks Get failed, blowing up: " << status.ToString(); - invariant(false); - } + if (status.IsNotFound()) { + return RecordData(nullptr, 0); } + invariantRocksOK(status); SharedBuffer data = SharedBuffer::allocate(valueStorage.size()); memcpy(data.get(), valueStorage.data(), valueStorage.size()); @@ -743,12 +852,6 @@ namespace mongo { _locate(start); } - void RocksRecordStore::Iterator::_checkStatus() { - if ( !_iterator->status().ok() ) - log() << "Rocks Iterator Error: " << _iterator->status().ToString(); - invariant( _iterator->status().ok() ); - } - bool RocksRecordStore::Iterator::isEOF() { return _eof; } @@ -773,6 +876,8 @@ namespace mongo { else _iterator->Prev(); + invariantRocksOK(_iterator->status()); + if (_iterator->Valid()) { _curr = _decodeCurr(); if (_cappedVisibilityManager.get()) { // isCapped? @@ -795,7 +900,6 @@ namespace mongo { // we leave _curr as it is on purpose } - _checkStatus(); _lastLoc = toReturn; return toReturn; } @@ -861,7 +965,7 @@ namespace mongo { int64_t locStorage; _iterator->Seek(RocksRecordStore::_makeKey(loc, &locStorage)); } - _checkStatus(); + invariantRocksOK(_iterator->status()); } else { // backward iterator if (loc.isNull()) { _iterator->SeekToLast(); @@ -869,14 +973,14 @@ namespace mongo { // lower bound on reverse iterator int64_t locStorage; _iterator->Seek(RocksRecordStore::_makeKey(loc, &locStorage)); - _checkStatus(); + invariantRocksOK(_iterator->status()); if (!_iterator->Valid()) { _iterator->SeekToLast(); } else if (_decodeCurr() != loc) { _iterator->Prev(); } } - _checkStatus(); + invariantRocksOK(_iterator->status()); } _eof = !_iterator->Valid(); if (_eof) { diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h index d8f6bcf6cf4..1257ffe9c27 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.h +++ b/src/mongo/db/storage/rocks/rocks_record_store.h @@ -75,6 +75,7 @@ namespace mongo { }; class RocksRecoveryUnit; + class RocksOplogKeyTracker; class RocksRecordStore : public RecordStore { public: @@ -88,7 +89,7 @@ namespace mongo { // name of the RecordStore implementation virtual const char* name() const { return "rocks"; } - virtual long long dataSize(OperationContext* txn) const { return _dataSize.load(); } + virtual long long dataSize(OperationContext* txn) const; virtual long long numRecords( OperationContext* txn ) const; @@ -167,11 +168,8 @@ namespace mongo { virtual Status oplogDiskLocRegister(OperationContext* txn, const OpTime& opTime); - virtual void updateStatsAfterRepair(OperationContext* txn, - long long numRecords, - long long dataSize) { - // TODO - } + virtual void updateStatsAfterRepair(OperationContext* txn, long long numRecords, + long long dataSize); void setCappedDeleteCallback(CappedDocumentDeleteCallback* cb) { _cappedDeleteCallback = cb; @@ -187,6 +185,8 @@ namespace mongo { static rocksdb::Comparator* newRocksCollectionComparator(); private: + // we just need to expose _makePrefixedKey to RocksOplogKeyTracker + friend class RocksOplogKeyTracker; // NOTE: RecordIterator might outlive the RecordStore. That's why we use all those // shared_ptrs class Iterator : public RecordIterator { @@ -207,7 +207,6 @@ namespace mongo { void _locate(const RecordId& loc); RecordId _decodeCurr() const; bool _forward() const; - void _checkStatus(); OperationContext* _txn; rocksdb::DB* _db; // not owned @@ -221,12 +220,6 @@ namespace mongo { boost::scoped_ptr<rocksdb::Iterator> _iterator; }; - /** - * Returns a new ReadOptions struct, containing the snapshot held in opCtx, if opCtx is not - * null - */ - static rocksdb::ReadOptions _readOptions(OperationContext* opCtx = NULL); - static RecordId _makeRecordId( const rocksdb::Slice& slice ); static RecordData _getDataFor(rocksdb::DB* db, const std::string& prefix, @@ -254,7 +247,14 @@ namespace mongo { int _cappedDeleteCheckCount; // see comment in ::cappedDeleteAsNeeded const bool _isOplog; - int _oplogCounter; + // nullptr iff _isOplog == false + RocksOplogKeyTracker* _oplogKeyTracker; + // SeekToFirst() on an oplog is an expensive operation because bunch of keys at the start + // are deleted. To reduce the overhead, we remember the next key to delete and seek directly + // to it. This will not work correctly if somebody inserted a key before this + // _oplogNextToDelete. However, we prevent this from happening by using + // _cappedVisibilityManager and checking isCappedHidden() during deletions + RecordId _oplogNextToDelete; boost::shared_ptr<CappedVisibilityManager> _cappedVisibilityManager; diff --git a/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp b/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp index d60bf6cb738..f897a5a944e 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/storage/rocks/rocks_engine.h" +#include "rocks_engine.h" namespace mongo { diff --git a/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp b/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp index 91d86698705..0554496a95e 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp @@ -42,13 +42,14 @@ #include "mongo/db/global_environment_experiment.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context_impl.h" -#include "mongo/db/storage/rocks/rocks_engine.h" -#include "mongo/db/storage/rocks/rocks_record_store.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" #include "mongo/util/background.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" +#include "rocks_engine.h" +#include "rocks_record_store.h" +#include "rocks_recovery_unit.h" + namespace mongo { namespace { @@ -59,7 +60,7 @@ namespace mongo { class RocksRecordStoreThread : public BackgroundJob { public: RocksRecordStoreThread(const NamespaceString& ns) - : _ns(ns) { + : BackgroundJob(true /* deleteSelf */), _ns(ns) { _name = std::string("RocksRecordStoreThread for ") + _ns.toString(); } @@ -123,11 +124,9 @@ namespace mongo { // If we removed 0 documents, sleep a bit in case we're on a laptop // or something to be nice. sleepmillis(1000); - } - else if(removed < 1000) { - // 1000 is the batch size, so we didn't even do a full batch, - // which is the most efficient. - sleepmillis(10); + } else { + // wake up every 100ms + sleepmillis(100); } } diff --git a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp index b6ebcc7dfc4..d4ed147e94d 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp @@ -41,12 +41,13 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/storage/record_store_test_harness.h" -#include "mongo/db/storage/rocks/rocks_record_store.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" -#include "mongo/db/storage/rocks/rocks_transaction.h" #include "mongo/unittest/unittest.h" #include "mongo/unittest/temp_dir.h" +#include "rocks_record_store.h" +#include "rocks_recovery_unit.h" +#include "rocks_transaction.h" + namespace mongo { using boost::scoped_ptr; @@ -203,7 +204,6 @@ namespace mongo { return res; } - // TODO remove from here once mongo made the test generic TEST(RocksRecordStoreTest, OplogHack) { RocksRecordStoreHarnessHelper harnessHelper; scoped_ptr<RecordStore> rs(harnessHelper.newNonCappedRecordStore("local.oplog.foo")); diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp index 5ca11018209..2ec2f592df1 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp @@ -31,7 +31,7 @@ #include "mongo/platform/basic.h" #include "mongo/platform/endian.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" +#include "rocks_recovery_unit.h" #include <rocksdb/comparator.h> #include <rocksdb/db.h> @@ -44,10 +44,11 @@ #include "mongo/base/checked_cast.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/operation_context.h" -#include "mongo/db/storage/rocks/rocks_transaction.h" -#include "mongo/db/storage/rocks/rocks_util.h" #include "mongo/util/log.h" +#include "rocks_transaction.h" +#include "rocks_util.h" + namespace mongo { namespace { class PrefixStrippingIterator : public rocksdb::Iterator { @@ -149,7 +150,12 @@ namespace mongo { } bool RocksRecoveryUnit::awaitCommit() { - // TODO + // Not sure what we should do here. awaitCommit() is called when WriteConcern is FSYNC or + // JOURNAL. In our case, we're doing JOURNAL WriteConcern for each transaction (no matter + // WriteConcern). However, if WriteConcern is FSYNC we should probably call Write() with + // sync option. So far we're just not doing anything. In the future we should figure which + // of the WriteConcerns is this (FSYNC or JOURNAL) and then if it's FSYNC do something + // special. return true; } @@ -179,6 +185,7 @@ namespace mongo { void RocksRecoveryUnit::_releaseSnapshot() { if (_snapshot) { + _transaction.abort(); _db->ReleaseSnapshot(_snapshot); _snapshot = nullptr; } @@ -191,9 +198,8 @@ namespace mongo { auto& counter = pair.second; counter._value->fetch_add(counter._delta, std::memory_order::memory_order_relaxed); long long newValue = counter._value->load(std::memory_order::memory_order_relaxed); - int64_t littleEndian = static_cast<int64_t>(endian::littleToNative(newValue)); - const char* nr_ptr = reinterpret_cast<const char*>(&littleEndian); - writeBatch()->Put(pair.first, rocksdb::Slice(nr_ptr, sizeof(littleEndian))); + int64_t storage; + writeBatch()->Put(pair.first, encodeCounter(newValue, &storage)); } if (_writeBatch->GetWriteBatch()->Count() != 0) { @@ -202,10 +208,7 @@ namespace mongo { rocksdb::WriteOptions writeOptions; writeOptions.disableWAL = !_durable; auto status = _db->Write(rocksdb::WriteOptions(), _writeBatch->GetWriteBatch()); - if (!status.ok()) { - log() << "uh oh: " << status.ToString(); - invariant(!"rocks write batch commit failed"); - } + invariantRocksOK(status); _transaction.commit(); } _deltaCounters.clear(); @@ -219,7 +222,6 @@ namespace mongo { } _changes.clear(); - _transaction.abort(); _deltaCounters.clear(); _writeBatch.reset(); @@ -246,7 +248,6 @@ namespace mongo { if (entry.type == rocksdb::WriteType::kDeleteRecord) { return rocksdb::Status::NotFound(); } - // TODO avoid double copy *value = std::string(entry.value.data(), entry.value.size()); return rocksdb::Status::OK(); } @@ -301,10 +302,8 @@ namespace mongo { auto s = db->Get(rocksdb::ReadOptions(), counterKey, &value); if (s.IsNotFound()) { return 0; - } else if (!s.ok()) { - log() << "Counter get failed " << s.ToString(); - invariant(!"Counter get failed"); } + invariantRocksOK(s); int64_t ret; invariant(sizeof(ret) == value.size()); @@ -317,4 +316,8 @@ namespace mongo { return checked_cast<RocksRecoveryUnit*>(opCtx->recoveryUnit()); } + rocksdb::Slice RocksRecoveryUnit::encodeCounter(long long counter, int64_t* storage) { + *storage = static_cast<int64_t>(endian::littleToNative(counter)); + return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage)); + } } diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h index 280e4b63f1e..d2834dcd0d8 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h @@ -38,11 +38,14 @@ #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <rocksdb/slice.h> + #include "mongo/base/disallow_copying.h" #include "mongo/base/owned_pointer_vector.h" #include "mongo/db/record_id.h" #include "mongo/db/storage/recovery_unit.h" -#include "mongo/db/storage/rocks/rocks_transaction.h" + +#include "rocks_transaction.h" namespace rocksdb { class DB; @@ -123,6 +126,8 @@ namespace mongo { static int getTotalLiveRecoveryUnits() { return _totalLiveRecoveryUnits.load(); } + static rocksdb::Slice encodeCounter(long long counter, int64_t* storage); + private: void _releaseSnapshot(); diff --git a/src/mongo/db/storage/rocks/rocks_server_status.cpp b/src/mongo/db/storage/rocks/rocks_server_status.cpp index 945369d93a3..d7d3df8d757 100644 --- a/src/mongo/db/storage/rocks/rocks_server_status.cpp +++ b/src/mongo/db/storage/rocks/rocks_server_status.cpp @@ -31,8 +31,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/storage/rocks/rocks_server_status.h" -#include "mongo/db/storage/rocks/rocks_recovery_unit.h" +#include "rocks_server_status.h" #include "boost/scoped_ptr.hpp" @@ -40,10 +39,13 @@ #include "mongo/base/checked_cast.h" #include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/storage/rocks/rocks_engine.h" #include "mongo/util/assert_util.h" #include "mongo/util/scopeguard.h" +#include "rocks_recovery_unit.h" +#include "rocks_engine.h" +#include "rocks_transaction.h" + namespace mongo { using std::string; @@ -62,7 +64,7 @@ namespace mongo { } // namespace RocksServerStatusSection::RocksServerStatusSection(RocksEngine* engine) - : ServerStatusSection("RocksDB"), _engine(engine) {} + : ServerStatusSection("rocksdb"), _engine(engine) {} bool RocksServerStatusSection::includeByDefault() const { return true; } @@ -108,6 +110,10 @@ namespace mongo { } bob.append("total-live-recovery-units", RocksRecoveryUnit::getTotalLiveRecoveryUnits()); bob.append("block-cache-usage", PrettyPrintBytes(_engine->getBlockCacheUsage())); + bob.append("transaction-engine-keys", + static_cast<long long>(_engine->getTransactionEngine()->numKeysTracked())); + bob.append("transaction-engine-snapshots", + static_cast<long long>(_engine->getTransactionEngine()->numActiveSnapshots())); return bob.obj(); } diff --git a/src/mongo/db/storage/rocks/rocks_transaction.cpp b/src/mongo/db/storage/rocks/rocks_transaction.cpp index 58e0bf3c6af..0e14fba0bba 100644 --- a/src/mongo/db/storage/rocks/rocks_transaction.cpp +++ b/src/mongo/db/storage/rocks/rocks_transaction.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/storage/rocks/rocks_transaction.h" +#include "rocks_transaction.h" #include <atomic> #include <map> @@ -41,6 +41,15 @@ namespace mongo { RocksTransactionEngine::RocksTransactionEngine() : _latestSnapshotId(1), _nextTransactionId(1) {} + size_t RocksTransactionEngine::numKeysTracked() { + boost::mutex::scoped_lock lk(_lock); + return _keyInfo.size(); + } + size_t RocksTransactionEngine::numActiveSnapshots() { + boost::mutex::scoped_lock lk(_lock); + return _activeSnapshots.size(); + } + std::list<uint64_t>::iterator RocksTransactionEngine::_getLatestSnapshotId_inlock() { return _activeSnapshots.insert(_activeSnapshots.end(), _latestSnapshotId); } @@ -60,7 +69,7 @@ namespace mongo { } auto listIter = _keysSortedBySnapshot.insert(_keysSortedBySnapshot.end(), {key, newSnapshotId}); - _keyInfo.insert({key, {newSnapshotId, listIter}}); + _keyInfo.insert({StringData(listIter->first), {newSnapshotId, listIter}}); } void RocksTransactionEngine::_cleanUpKeysCommittedBeforeSnapshot_inlock(uint64_t snapshotId) { diff --git a/src/mongo/db/storage/rocks/rocks_transaction.h b/src/mongo/db/storage/rocks/rocks_transaction.h index 880677a1e3b..555c1324638 100644 --- a/src/mongo/db/storage/rocks/rocks_transaction.h +++ b/src/mongo/db/storage/rocks/rocks_transaction.h @@ -37,6 +37,8 @@ #include <boost/thread/mutex.hpp> +#include "mongo/base/string_data.h" + namespace mongo { class RocksTransaction; @@ -44,6 +46,9 @@ namespace mongo { public: RocksTransactionEngine(); + size_t numKeysTracked(); + size_t numActiveSnapshots(); + private: // REQUIRES: transaction engine lock locked std::list<uint64_t>::iterator _getLatestSnapshotId_inlock(); @@ -84,13 +89,13 @@ namespace mongo { // * snapshot ID of the last commit to this key // * an iterator pointing to the corresponding entry in _keysSortedBySnapshot. This is used // to update the list at the same time as we update the _keyInfo - // TODO optimize these structures to store only one key instead of two typedef std::list<std::pair<std::string, uint64_t>> KeysSortedBySnapshotList; typedef std::list<std::pair<std::string, uint64_t>>::iterator KeysSortedBySnapshotListIter; KeysSortedBySnapshotList _keysSortedBySnapshot; // map of key -> pair{seq_id, pointer to corresponding _keysSortedBySnapshot} - std::unordered_map<std::string, std::pair<uint64_t, KeysSortedBySnapshotListIter>> _keyInfo; - + // key is a StringData and it points to the actual string in _keysSortedBySnapshot + std::unordered_map<StringData, std::pair<uint64_t, KeysSortedBySnapshotListIter>, + StringData::Hasher> _keyInfo; std::unordered_map<std::string, uint64_t> _uncommittedTransactionId; // this list is sorted diff --git a/src/mongo/db/storage/rocks/rocks_util.cpp b/src/mongo/db/storage/rocks/rocks_util.cpp new file mode 100644 index 00000000000..c31b362a083 --- /dev/null +++ b/src/mongo/db/storage/rocks/rocks_util.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#include "rocks_util.h" + +#include <string> +#include <rocksdb/status.h> + +namespace mongo { + + Status rocksToMongoStatus_slow(const rocksdb::Status& status, const char* prefix) { + if (status.ok()) { + return Status::OK(); + } + + if (status.IsCorruption()) { + return Status(ErrorCodes::BadValue, status.ToString()); + } + + return Status(ErrorCodes::InternalError, status.ToString()); + } + +} // namespace mongo diff --git a/src/mongo/db/storage/rocks/rocks_util.h b/src/mongo/db/storage/rocks/rocks_util.h index f8611d60d84..98b927c5610 100644 --- a/src/mongo/db/storage/rocks/rocks_util.h +++ b/src/mongo/db/storage/rocks/rocks_util.h @@ -29,6 +29,9 @@ #pragma once #include <string> +#include <rocksdb/status.h> + +#include "mongo/util/assert_util.h" namespace mongo { @@ -45,4 +48,24 @@ namespace mongo { return nextPrefix; } + Status rocksToMongoStatus_slow(const rocksdb::Status& status, const char* prefix); + + /** + * converts rocksdb status to mongodb status + */ + inline Status rocksToMongoStatus(const rocksdb::Status& status, const char* prefix = NULL) { + if (MONGO_likely(status.ok())) { + return Status::OK(); + } + return rocksToMongoStatus_slow(status, prefix); + } + +#define invariantRocksOK(expression) do { \ + auto _invariantRocksOK_status = expression; \ + if (MONGO_unlikely(!_invariantRocksOK_status.ok())) { \ + invariantOKFailed(#expression, rocksToMongoStatus(_invariantRocksOK_status), \ + __FILE__, __LINE__); \ + } \ + } while (false) + } // namespace mongo |