diff options
author | Igor Canadi <icanadi@fb.com> | 2015-03-24 12:22:33 -0700 |
---|---|---|
committer | Ramon Fernandez <ramon.fernandez@mongodb.com> | 2015-04-20 16:45:32 -0400 |
commit | a966cea74fc5454f4a03691ffe59ab62162c88f7 (patch) | |
tree | 0245f9fd63c808f59b6933ea56af99e4bea939e2 /src | |
parent | d12a68e550e4f928b8c8f9d052062b4072b236ef (diff) | |
download | mongo-a966cea74fc5454f4a03691ffe59ab62162c88f7.tar.gz |
SERVER-17939 Backport mongo-rocks updates to v3.0 branch
* Expose RocksDB's CompactRange() API
* Implement not-crash safe counters in Mongo-rocks
* Implement backup for Mongo+Rocks
* Fail to start server with invalid rocksdbConfigString
* Check iterator status only if it's not Valid
* Switch call to DB::CompactRange to SuggestCompactRange() which is more light weight
We have been testing those changes in Parse's environment and have good
confidence of their stability.
Signed-off-by: Ramon Fernandez <ramon.fernandez@mongodb.com>
Diffstat (limited to 'src')
17 files changed, 420 insertions, 70 deletions
diff --git a/src/mongo/db/storage/rocks/SConscript b/src/mongo/db/storage/rocks/SConscript index 2ba0f3149d3..ce7e31d0856 100644 --- a/src/mongo/db/storage/rocks/SConscript +++ b/src/mongo/db/storage/rocks/SConscript @@ -6,6 +6,7 @@ if has_option("rocksdb"): env.Library( target= 'storage_rocks_base', source= [ + 'rocks_counter_manager.cpp', 'rocks_global_options.cpp', 'rocks_engine.cpp', 'rocks_record_store.cpp', diff --git a/src/mongo/db/storage/rocks/rocks_counter_manager.cpp b/src/mongo/db/storage/rocks/rocks_counter_manager.cpp new file mode 100644 index 00000000000..38773db3163 --- /dev/null +++ b/src/mongo/db/storage/rocks/rocks_counter_manager.cpp @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" +#include "mongo/platform/endian.h" + +#include "rocks_counter_manager.h" + +#include <atomic> +#include <map> +#include <memory> +#include <string> + +// for invariant() +#include "mongo/util/assert_util.h" + +#include <rocksdb/db.h> + +#include "rocks_util.h" + +namespace mongo { + + long long RocksCounterManager::loadCounter(const std::string& counterKey) { + { + boost::mutex::scoped_lock lk(_lock); + auto itr = _counters.find(counterKey); + if (itr != _counters.end()) { + return itr->second; + } + } + std::string value; + auto s = _db->Get(rocksdb::ReadOptions(), counterKey, &value); + if (s.IsNotFound()) { + return 0; + } + invariantRocksOK(s); + + int64_t ret; + invariant(sizeof(ret) == value.size()); + memcpy(&ret, value.data(), sizeof(ret)); + // we store counters in little endian + return static_cast<long long>(endian::littleToNative(ret)); + } + + void RocksCounterManager::updateCounter(const std::string& counterKey, long long count, + rocksdb::WriteBatch* writeBatch) { + + if (_crashSafe) { + int64_t storage; + writeBatch->Put(counterKey, _encodeCounter(count, &storage)); + } else { + boost::mutex::scoped_lock lk(_lock); + _counters[counterKey] = count; + ++_syncCounter; + if (!_syncing && _syncCounter >= kSyncEvery) { + // let's sync this now. piggyback on writeBatch + int64_t storage; + for (const auto& counter : _counters) { + writeBatch->Put(counter.first, _encodeCounter(counter.second, &storage)); + } + _counters.clear(); + _syncCounter = 0; + } + } + } + + void RocksCounterManager::sync() { + rocksdb::WriteBatch wb; + { + boost::mutex::scoped_lock lk(_lock); + if (_syncing || _counters.size() == 0) { + return; + } + int64_t storage; + for (const auto& counter : _counters) { + wb.Put(counter.first, _encodeCounter(counter.second, &storage)); + } + _counters.clear(); + _syncCounter = 0; + _syncing = true; + } + auto s = _db->Write(rocksdb::WriteOptions(), &wb); + invariantRocksOK(s); + { + boost::mutex::scoped_lock lk(_lock); + _syncing = false; + } + } + + rocksdb::Slice RocksCounterManager::_encodeCounter(long long counter, int64_t* storage) { + *storage = static_cast<int64_t>(endian::littleToNative(counter)); + return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage)); + } +} diff --git a/src/mongo/db/storage/rocks/rocks_counter_manager.h b/src/mongo/db/storage/rocks/rocks_counter_manager.h new file mode 100644 index 00000000000..e88fb3da300 --- /dev/null +++ b/src/mongo/db/storage/rocks/rocks_counter_manager.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <atomic> +#include <set> +#include <unordered_map> +#include <memory> +#include <string> +#include <list> + +#include <boost/thread/mutex.hpp> + +#include <rocksdb/db.h> +#include <rocksdb/slice.h> + +#include "mongo/base/string_data.h" + +namespace mongo { + + class RocksCounterManager { + public: + RocksCounterManager(rocksdb::DB* db, bool crashSafe) + : _db(db), _crashSafe(crashSafe), _syncing(false), _syncCounter(0) {} + + long long loadCounter(const std::string& counterKey); + + void updateCounter(const std::string& counterKey, long long count, + rocksdb::WriteBatch* writeBatch); + + void sync(); + + bool crashSafe() const { return _crashSafe; } + + private: + static rocksdb::Slice _encodeCounter(long long counter, int64_t* storage); + + rocksdb::DB* _db; // not owned + const bool _crashSafe; + boost::mutex _lock; + // protected by _lock + bool _syncing; + // protected by _lock + std::unordered_map<std::string, long long> _counters; + // protected by _lock + int _syncCounter; + + static const int kSyncEvery = 10000; + }; + +} diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp index e16447b99f0..840900c7a31 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.cpp +++ b/src/mongo/db/storage/rocks/rocks_engine.cpp @@ -49,6 +49,7 @@ #include <rocksdb/utilities/convenience.h> #include <rocksdb/filter_policy.h> #include <rocksdb/utilities/write_batch_with_index.h> +#include <rocksdb/utilities/checkpoint.h> #include "mongo/db/catalog/collection_options.h" #include "mongo/db/index/index_descriptor.h" @@ -59,6 +60,7 @@ #include "mongo/util/log.h" #include "mongo/util/processinfo.h" +#include "rocks_counter_manager.h" #include "rocks_global_options.h" #include "rocks_record_store.h" #include "rocks_recovery_unit.h" @@ -175,6 +177,9 @@ namespace mongo { invariantRocksOK(s); _db.reset(db); + _counterManager.reset( + new RocksCounterManager(_db.get(), rocksGlobalOptions.crashSafeCounters)); + // open iterator boost::scoped_ptr<rocksdb::Iterator> iter(_db->NewIterator(rocksdb::ReadOptions())); @@ -246,10 +251,11 @@ namespace mongo { } } - RocksEngine::~RocksEngine() {} + RocksEngine::~RocksEngine() { cleanShutdown(); } RecoveryUnit* RocksEngine::newRecoveryUnit() { - return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _durable); + return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _counterManager.get(), + _durable); } Status RocksEngine::createRecordStore(OperationContext* opCtx, const StringData& ns, @@ -275,11 +281,12 @@ namespace mongo { } if (options.capped) { return new RocksRecordStore( - ns, ident, _db.get(), _getIdentPrefix(ident), true, + ns, ident, _db.get(), _counterManager.get(), _getIdentPrefix(ident), true, options.cappedSize ? options.cappedSize : 4096, // default size options.cappedMaxDocs ? options.cappedMaxDocs : -1); } else { - return new RocksRecordStore(ns, ident, _db.get(), _getIdentPrefix(ident)); + return new RocksRecordStore(ns, ident, _db.get(), _counterManager.get(), + _getIdentPrefix(ident)); } } @@ -361,6 +368,8 @@ namespace mongo { return indents; } + void RocksEngine::cleanShutdown() { _counterManager->sync(); } + int64_t RocksEngine::getIdentSize(OperationContext* opCtx, const StringData& ident) { uint64_t storageSize; std::string prefix = _getIdentPrefix(ident); @@ -375,6 +384,16 @@ namespace mongo { _rateLimiter->SetBytesPerSecond(static_cast<int64_t>(_maxWriteMBPerSec) * 1024 * 1024); } + Status RocksEngine::backup(const std::string& path) { + rocksdb::Checkpoint* checkpoint; + auto s = rocksdb::Checkpoint::Create(_db.get(), &checkpoint); + if (s.ok()) { + s = checkpoint->CreateCheckpoint(path); + } + delete checkpoint; + return rocksToMongoStatus(s); + } + std::unordered_set<uint32_t> RocksEngine::getDroppedPrefixes() const { boost::mutex::scoped_lock lk(_droppedPrefixesMutex); // this will copy the set. that way compaction filter has its own copy and doesn't need to @@ -454,7 +473,11 @@ namespace mongo { // allow override if (!rocksGlobalOptions.configString.empty()) { rocksdb::Options base_options(options); - rocksdb::GetOptionsFromString(base_options, rocksGlobalOptions.configString, &options); + auto s = rocksdb::GetOptionsFromString(base_options, rocksGlobalOptions.configString, &options); + if (!s.ok()) { + log() << "Invalid rocksdbConfigString \"" << rocksGlobalOptions.configString << "\""; + invariantRocksOK(s); + } } return options; diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h index f1a4d2f58da..b87cacd573d 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.h +++ b/src/mongo/db/storage/rocks/rocks_engine.h @@ -49,6 +49,7 @@ #include "mongo/db/storage/kv/kv_engine.h" #include "mongo/util/string_map.h" +#include "rocks_counter_manager.h" #include "rocks_transaction.h" namespace rocksdb { @@ -113,7 +114,7 @@ namespace mongo { return Status::OK(); } - virtual void cleanShutdown() {} + virtual void cleanShutdown(); /** * Initializes a background job to remove excess documents in the oplog collections. @@ -135,6 +136,8 @@ namespace mongo { int getMaxWriteMBPerSec() const { return _maxWriteMBPerSec; } void setMaxWriteMBPerSec(int maxWriteMBPerSec); + Status backup(const std::string& path); + private: Status _createIdentPrefix(const StringData& ident); std::string _getIdentPrefix(const StringData& ident); @@ -165,6 +168,9 @@ namespace mongo { // This is for concurrency control RocksTransactionEngine _transactionEngine; + // CounterManages manages counters like numRecords and dataSize for record stores + boost::scoped_ptr<RocksCounterManager> _counterManager; + static const std::string kMetadataPrefix; static const std::string kDroppedPrefix; }; diff --git a/src/mongo/db/storage/rocks/rocks_global_options.cpp b/src/mongo/db/storage/rocks/rocks_global_options.cpp index 264b17805f7..ae7dd8f1a8c 100644 --- a/src/mongo/db/storage/rocks/rocks_global_options.cpp +++ b/src/mongo/db/storage/rocks/rocks_global_options.cpp @@ -65,6 +65,15 @@ namespace mongo { moe::String, "RocksDB storage engine custom " "configuration settings").hidden(); + rocksOptions.addOptionChaining("storage.rocksdb.crashSafeCounters", + "rocksdbCrashSafeCounters", moe::Bool, + "If true, numRecord and dataSize counter will be consistent " + "even after power failure. If false, numRecord and dataSize " + "might be a bit inconsistent after power failure, but " + "should be correct under normal conditions. Setting this to " + "true will make database inserts a bit slower.") + .setDefault(moe::Value(false)) + .hidden(); return options->addSection(rocksOptions); } @@ -90,6 +99,11 @@ namespace mongo { params["storage.rocksdb.configString"].as<std::string>(); log() << "Engine custom option: " << rocksGlobalOptions.configString; } + if (params.count("storage.rocksdb.crashSafeCounters")) { + rocksGlobalOptions.crashSafeCounters = + params["storage.rocksdb.crashSafeCounters"].as<bool>(); + log() << "Crash safe counters: " << rocksGlobalOptions.crashSafeCounters; + } return Status::OK(); } diff --git a/src/mongo/db/storage/rocks/rocks_global_options.h b/src/mongo/db/storage/rocks/rocks_global_options.h index aa8d103df33..d8efc51a556 100644 --- a/src/mongo/db/storage/rocks/rocks_global_options.h +++ b/src/mongo/db/storage/rocks/rocks_global_options.h @@ -37,7 +37,11 @@ namespace mongo { class RocksGlobalOptions { public: - RocksGlobalOptions() : cacheSizeGB(0), maxWriteMBPerSec(1024), compression("snappy") {} + RocksGlobalOptions() + : cacheSizeGB(0), + maxWriteMBPerSec(1024), + compression("snappy"), + crashSafeCounters(false) {} Status add(moe::OptionSection* options); Status store(const moe::Environment& params, const std::vector<std::string>& args); @@ -47,6 +51,8 @@ namespace mongo { std::string compression; std::string configString; + + bool crashSafeCounters; }; extern RocksGlobalOptions rocksGlobalOptions; diff --git a/src/mongo/db/storage/rocks/rocks_index.cpp b/src/mongo/db/storage/rocks/rocks_index.cpp index e08b6395c9c..a91b17efea2 100644 --- a/src/mongo/db/storage/rocks/rocks_index.cpp +++ b/src/mongo/db/storage/rocks/rocks_index.cpp @@ -112,7 +112,6 @@ namespace mongo { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); _iterator.reset(ru->NewIterator(_prefix)); _currentSequenceNumber = ru->snapshot()->GetSequenceNumber(); - invariantRocksOK(_iterator->status()); } int getDirection() const { return _forward ? 1 : -1; } @@ -234,7 +233,6 @@ namespace mongo { auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); if (_currentSequenceNumber != ru->snapshot()->GetSequenceNumber()) { _iterator.reset(ru->NewIterator(_prefix)); - invariantRocksOK(_iterator->status()); _currentSequenceNumber = ru->snapshot()->GetSequenceNumber(); if (!_savedEOF) { @@ -260,7 +258,9 @@ namespace mongo { } else { _iterator->Prev(); } - invariantRocksOK(_iterator->status()); + if (!_iterator->Valid()) { + invariantRocksOK(_iterator->status()); + } } // Seeks to query. Returns true on exact match. @@ -268,11 +268,14 @@ namespace mongo { const rocksdb::Slice keySlice(query.getBuffer(), query.getSize()); _iterator->Seek(keySlice); if (!_iterator->Valid()) { + invariantRocksOK(_iterator->status()); if (!_forward) { // this will give lower bound behavior for backwards _iterator->SeekToLast(); + if (!_iterator->Valid()) { + invariantRocksOK(_iterator->status()); + } } - invariantRocksOK(_iterator->status()); return false; } @@ -288,7 +291,9 @@ namespace mongo { // were // searching for. _iterator->Prev(); - invariantRocksOK(_iterator->status()); + if (!_iterator->Valid()) { + invariantRocksOK(_iterator->status()); + } } return false; diff --git a/src/mongo/db/storage/rocks/rocks_index_test.cpp b/src/mongo/db/storage/rocks/rocks_index_test.cpp index b6f865fbaf1..3007cb9fb62 100644 --- a/src/mongo/db/storage/rocks/rocks_index_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_index_test.cpp @@ -64,6 +64,7 @@ namespace mongo { auto s = rocksdb::DB::Open(options, _tempDir.path(), &db); ASSERT(s.ok()); _db.reset(db); + _counterManager.reset(new RocksCounterManager(_db.get(), true)); } virtual SortedDataInterface* newSortedDataInterface(bool unique) { @@ -75,7 +76,7 @@ namespace mongo { } virtual RecoveryUnit* newRecoveryUnit() { - return new RocksRecoveryUnit(&_transactionEngine, _db.get(), true); + return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _counterManager.get(), true); } private: @@ -84,6 +85,7 @@ namespace mongo { unittest::TempDir _tempDir; scoped_ptr<rocksdb::DB> _db; RocksTransactionEngine _transactionEngine; + scoped_ptr<RocksCounterManager> _counterManager; }; HarnessHelper* newHarnessHelper() { return new RocksIndexHarness(); } diff --git a/src/mongo/db/storage/rocks/rocks_init.cpp b/src/mongo/db/storage/rocks/rocks_init.cpp index 18d8b9d08c4..3a438ed263e 100644 --- a/src/mongo/db/storage/rocks/rocks_init.cpp +++ b/src/mongo/db/storage/rocks/rocks_init.cpp @@ -59,6 +59,8 @@ namespace mongo { // Intentionally leaked. auto leaked __attribute__((unused)) = new RocksServerStatusSection(engine); auto leaked2 __attribute__((unused)) = new RocksRateLimiterServerParameter(engine); + auto leaked3 __attribute__((unused)) = new RocksBackupServerParameter(engine); + auto leaked4 __attribute__((unused)) = new RocksCompactServerParameter(engine); return new KVStorageEngine(engine, options); } diff --git a/src/mongo/db/storage/rocks/rocks_parameters.cpp b/src/mongo/db/storage/rocks/rocks_parameters.cpp index 4181d8aab2f..c92a6869864 100644 --- a/src/mongo/db/storage/rocks/rocks_parameters.cpp +++ b/src/mongo/db/storage/rocks/rocks_parameters.cpp @@ -30,11 +30,15 @@ #include "mongo/platform/basic.h" #include "rocks_parameters.h" +#include "rocks_util.h" #include "mongo/logger/parse_log_component_settings.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include <rocksdb/db.h> +#include <rocksdb/experimental.h> + namespace mongo { RocksRateLimiterServerParameter::RocksRateLimiterServerParameter(RocksEngine* engine) @@ -70,4 +74,43 @@ namespace mongo { return Status::OK(); } + + RocksBackupServerParameter::RocksBackupServerParameter(RocksEngine* engine) + : ServerParameter(ServerParameterSet::getGlobal(), "rocksdbBackup", false, true), + _engine(engine) {} + + void RocksBackupServerParameter::append(OperationContext* txn, BSONObjBuilder& b, + const std::string& name) { + b.append(name, ""); + } + + Status RocksBackupServerParameter::set(const BSONElement& newValueElement) { + auto str = newValueElement.str(); + if (str.size() == 0) { + return Status(ErrorCodes::BadValue, str::stream() << name() << " has to be a string"); + } + return setFromString(str); + } + + Status RocksBackupServerParameter::setFromString(const std::string& str) { + return _engine->backup(str); + } + + RocksCompactServerParameter::RocksCompactServerParameter(RocksEngine* engine) + : ServerParameter(ServerParameterSet::getGlobal(), "rocksdbCompact", false, true), + _engine(engine) {} + + void RocksCompactServerParameter::append(OperationContext* txn, BSONObjBuilder& b, + const std::string& name) { + b.append(name, ""); + } + + Status RocksCompactServerParameter::set(const BSONElement& newValueElement) { + return setFromString(""); + } + + Status RocksCompactServerParameter::setFromString(const std::string& str) { + auto s = rocksdb::experimental::SuggestCompactRange(_engine->getDB(), nullptr, nullptr); + return rocksToMongoStatus(s); + } } diff --git a/src/mongo/db/storage/rocks/rocks_parameters.h b/src/mongo/db/storage/rocks/rocks_parameters.h index b447a786386..295c8eb9cb6 100644 --- a/src/mongo/db/storage/rocks/rocks_parameters.h +++ b/src/mongo/db/storage/rocks/rocks_parameters.h @@ -33,6 +33,8 @@ namespace mongo { + // To dynamically configure RocksDB's rate limit, run + // db.adminCommand({setParameter:1, rocksdbRuntimeConfigMaxWriteMBPerSec:30}) class RocksRateLimiterServerParameter : public ServerParameter { MONGO_DISALLOW_COPYING(RocksRateLimiterServerParameter); @@ -47,4 +49,37 @@ namespace mongo { RocksEngine* _engine; }; + // We use mongo's setParameter() API to issue a backup request to rocksdb. + // To backup entire RocksDB instance, call: + // db.adminCommand({setParameter:1, rocksdbBackup: "/var/lib/mongodb/backup/1"}) + // The directory needs to be an absolute path. It should not exist -- it will be created + // automatically. + class RocksBackupServerParameter : public ServerParameter { + MONGO_DISALLOW_COPYING(RocksBackupServerParameter); + + public: + RocksBackupServerParameter(RocksEngine* engine); + virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name); + virtual Status set(const BSONElement& newValueElement); + virtual Status setFromString(const std::string& str); + + private: + RocksEngine* _engine; + }; + + // We use mongo's setParameter() API to issue a compact request to rocksdb. + // To compact entire RocksDB instance, call: + // db.adminCommand({setParameter:1, rocksdbCompact: 1}) + class RocksCompactServerParameter : public ServerParameter { + MONGO_DISALLOW_COPYING(RocksCompactServerParameter); + + public: + RocksCompactServerParameter(RocksEngine* engine); + virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name); + virtual Status set(const BSONElement& newValueElement); + virtual Status setFromString(const std::string& str); + + private: + RocksEngine* _engine; + }; } diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp index 810b7b08f79..5b500e7a248 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp @@ -56,6 +56,7 @@ #include "mongo/util/mongoutils/str.h" #include "mongo/util/timer.h" +#include "rocks_counter_manager.h" #include "rocks_engine.h" #include "rocks_recovery_unit.h" #include "rocks_util.h" @@ -169,13 +170,13 @@ namespace mongo { std::string _prefix; }; - RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id, - rocksdb::DB* db, // not owned here - std::string prefix, bool isCapped, int64_t cappedMaxSize, - int64_t cappedMaxDocs, + RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, + RocksCounterManager* counterManager, std::string prefix, + bool isCapped, int64_t cappedMaxSize, int64_t cappedMaxDocs, CappedDocumentDeleteCallback* cappedDeleteCallback) : RecordStore(ns), _db(db), + _counterManager(counterManager), _prefix(std::move(prefix)), _isCapped(isCapped), _cappedMaxSize(cappedMaxSize), @@ -184,8 +185,9 @@ namespace mongo { _cappedDeleteCallback(cappedDeleteCallback), _cappedDeleteCheckCount(0), _isOplog(NamespaceString::oplog(ns)), - _oplogKeyTracker( - _isOplog ? new RocksOplogKeyTracker(std::move(rocksGetNextPrefix(_prefix))) : nullptr), + _oplogKeyTracker(_isOplog + ? new RocksOplogKeyTracker(std::move(rocksGetNextPrefix(_prefix))) + : nullptr), _oplogNextToDelete(0), _cappedVisibilityManager((_isCapped || _isOplog) ? new CappedVisibilityManager() : nullptr), @@ -207,6 +209,7 @@ namespace mongo { boost::scoped_ptr<rocksdb::Iterator> iter( RocksRecoveryUnit::NewIteratorNoSnapshot(_db, _prefix)); iter->SeekToLast(); + bool emptyCollection = !iter->Valid(); if (iter->Valid()) { rocksdb::Slice lastSlice = iter->key(); RecordId lastId = _makeRecordId(lastSlice); @@ -220,11 +223,34 @@ namespace mongo { } // load metadata - _numRecords.store(RocksRecoveryUnit::getCounterValue(_db, _numRecordsKey)); - _dataSize.store(RocksRecoveryUnit::getCounterValue(_db, _dataSizeKey)); + _numRecords.store(_counterManager->loadCounter(_numRecordsKey)); + _dataSize.store(_counterManager->loadCounter(_dataSizeKey)); invariant(_dataSize.load() >= 0); invariant(_numRecords.load() >= 0); + if (!emptyCollection && !_counterManager->crashSafe() && + _numRecords.load() < kCollectionScanOnCreationThreshold) { + LOG(1) << "doing scan of collection " << ns << " to get info"; + + _numRecords.store(0); + _dataSize.store(0); + + long long numRecords = 0, dataSize = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + numRecords++; + dataSize += static_cast<long long>(iter->value().size()); + } + + _numRecords.store(numRecords); + _dataSize.store(dataSize); + + rocksdb::WriteBatch wb; + _counterManager->updateCounter(_numRecordsKey, numRecords, &wb); + _counterManager->updateCounter(_dataSizeKey, dataSize, &wb); + auto s = _db->Write(rocksdb::WriteOptions(), &wb); + invariantRocksOK(s); + } + _hasBackgroundThread = RocksEngine::initRsOplogBackgroundThread(ns); } @@ -699,11 +725,12 @@ namespace mongo { _numRecords.store(numRecords); _dataSize.store(dataSize); rocksdb::WriteBatch wb; - int64_t storage; - wb.Put(_numRecordsKey, RocksRecoveryUnit::encodeCounter(numRecords, &storage)); - wb.Put(_dataSizeKey, RocksRecoveryUnit::encodeCounter(dataSize, &storage)); - auto s = _db->Write(rocksdb::WriteOptions(), &wb); - invariantRocksOK(s); + _counterManager->updateCounter(_numRecordsKey, numRecords, &wb); + _counterManager->updateCounter(_dataSizeKey, dataSize, &wb); + if (wb.Count() > 0) { + auto s = _db->Write(rocksdb::WriteOptions(), &wb); + invariantRocksOK(s); + } } /** @@ -876,8 +903,6 @@ namespace mongo { else _iterator->Prev(); - invariantRocksOK(_iterator->status()); - if (_iterator->Valid()) { _curr = _decodeCurr(); if (_cappedVisibilityManager.get()) { // isCapped? @@ -896,6 +921,7 @@ namespace mongo { } } // isCapped? } else { + invariantRocksOK(_iterator->status()); _eof = true; // we leave _curr as it is on purpose } @@ -965,7 +991,6 @@ namespace mongo { int64_t locStorage; _iterator->Seek(RocksRecordStore::_makeKey(loc, &locStorage)); } - invariantRocksOK(_iterator->status()); } else { // backward iterator if (loc.isNull()) { _iterator->SeekToLast(); @@ -973,17 +998,17 @@ namespace mongo { // lower bound on reverse iterator int64_t locStorage; _iterator->Seek(RocksRecordStore::_makeKey(loc, &locStorage)); - invariantRocksOK(_iterator->status()); if (!_iterator->Valid()) { + invariantRocksOK(_iterator->status()); _iterator->SeekToLast(); } else if (_decodeCurr() != loc) { _iterator->Prev(); } } - invariantRocksOK(_iterator->status()); } _eof = !_iterator->Valid(); if (_eof) { + invariantRocksOK(_iterator->status()); _curr = loc; } else { _curr = _decodeCurr(); diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h index 1257ffe9c27..10fba514102 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.h +++ b/src/mongo/db/storage/rocks/rocks_record_store.h @@ -51,6 +51,7 @@ namespace rocksdb { namespace mongo { + class RocksCounterManager; class CappedVisibilityManager { public: CappedVisibilityManager() : _oplog_highestSeen(RecordId::min()) {} @@ -79,7 +80,8 @@ namespace mongo { class RocksRecordStore : public RecordStore { public: - RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, std::string prefix, + RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, + RocksCounterManager* counterManager, std::string prefix, bool isCapped = false, int64_t cappedMaxSize = -1, int64_t cappedMaxDocs = -1, CappedDocumentDeleteCallback* cappedDeleteCallback = NULL); @@ -235,7 +237,8 @@ namespace mongo { void _changeNumRecords(OperationContext* txn, int64_t amount); void _increaseDataSize(OperationContext* txn, int64_t amount); - rocksdb::DB* _db; // not owned + rocksdb::DB* _db; // not owned + RocksCounterManager* _counterManager; // not owned std::string _prefix; const bool _isCapped; @@ -268,5 +271,12 @@ namespace mongo { bool _shuttingDown; bool _hasBackgroundThread; + + /** + * During record store creation, if a record count is under + * 'kCollectionScanOnCreationThreshold', perform a collection scan to update the + * number of records and data size counters + */ + static const long long kCollectionScanOnCreationThreshold = 10000; }; } diff --git a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp index d4ed147e94d..828a5115efc 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp @@ -64,23 +64,25 @@ namespace mongo { auto s = rocksdb::DB::Open(options, _tempDir.path(), &db); ASSERT(s.ok()); _db.reset(db); + _counterManager.reset(new RocksCounterManager(_db.get(), true)); } virtual RecordStore* newNonCappedRecordStore() { return newNonCappedRecordStore("foo.bar"); } RecordStore* newNonCappedRecordStore(const std::string& ns) { - return new RocksRecordStore(ns, "1", _db.get(), "prefix"); + return new RocksRecordStore(ns, "1", _db.get(), _counterManager.get(), "prefix"); } RecordStore* newCappedRecordStore(const std::string& ns, int64_t cappedMaxSize, int64_t cappedMaxDocs) { - return new RocksRecordStore(ns, "1", _db.get(), "prefix", true, cappedMaxSize, - cappedMaxDocs); + return new RocksRecordStore(ns, "1", _db.get(), _counterManager.get(), "prefix", true, + cappedMaxSize, cappedMaxDocs); } virtual RecoveryUnit* newRecoveryUnit() { - return new RocksRecoveryUnit(&_transactionEngine, _db.get(), true); + return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _counterManager.get(), + true); } private: @@ -88,6 +90,7 @@ namespace mongo { unittest::TempDir _tempDir; boost::scoped_ptr<rocksdb::DB> _db; RocksTransactionEngine _transactionEngine; + scoped_ptr<RocksCounterManager> _counterManager; }; HarnessHelper* newHarnessHelper() { return new RocksRecordStoreHarnessHelper(); } diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp index 2ec2f592df1..da1ab9798a7 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp @@ -29,7 +29,6 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" -#include "mongo/platform/endian.h" #include "rocks_recovery_unit.h" @@ -104,9 +103,10 @@ namespace mongo { std::atomic<int> RocksRecoveryUnit::_totalLiveRecoveryUnits(0); RocksRecoveryUnit::RocksRecoveryUnit(RocksTransactionEngine* transactionEngine, rocksdb::DB* db, - bool durable) + RocksCounterManager* counterManager, bool durable) : _transactionEngine(transactionEngine), _db(db), + _counterManager(counterManager), _durable(durable), _transaction(transactionEngine), _writeBatch(), @@ -194,20 +194,20 @@ namespace mongo { void RocksRecoveryUnit::_commit() { invariant(_writeBatch); + rocksdb::WriteBatch* wb = _writeBatch->GetWriteBatch(); for (auto pair : _deltaCounters) { auto& counter = pair.second; counter._value->fetch_add(counter._delta, std::memory_order::memory_order_relaxed); long long newValue = counter._value->load(std::memory_order::memory_order_relaxed); - int64_t storage; - writeBatch()->Put(pair.first, encodeCounter(newValue, &storage)); + _counterManager->updateCounter(pair.first, newValue, wb); } - if (_writeBatch->GetWriteBatch()->Count() != 0) { + if (wb->Count() != 0) { // Order of operations here is important. It needs to be synchronized with // _transaction.recordSnapshotId() and _db->GetSnapshot() and rocksdb::WriteOptions writeOptions; writeOptions.disableWAL = !_durable; - auto status = _db->Write(rocksdb::WriteOptions(), _writeBatch->GetWriteBatch()); + auto status = _db->Write(rocksdb::WriteOptions(), wb); invariantRocksOK(status); _transaction.commit(); } @@ -297,27 +297,7 @@ namespace mongo { } } - long long RocksRecoveryUnit::getCounterValue(rocksdb::DB* db, const rocksdb::Slice counterKey) { - std::string value; - auto s = db->Get(rocksdb::ReadOptions(), counterKey, &value); - if (s.IsNotFound()) { - return 0; - } - invariantRocksOK(s); - - int64_t ret; - invariant(sizeof(ret) == value.size()); - memcpy(&ret, value.data(), sizeof(ret)); - // we store counters in little endian - return static_cast<long long>(endian::littleToNative(ret)); - } - RocksRecoveryUnit* RocksRecoveryUnit::getRocksRecoveryUnit(OperationContext* opCtx) { return checked_cast<RocksRecoveryUnit*>(opCtx->recoveryUnit()); } - - rocksdb::Slice RocksRecoveryUnit::encodeCounter(long long counter, int64_t* storage) { - *storage = static_cast<int64_t>(endian::littleToNative(counter)); - return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage)); - } } diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h index d2834dcd0d8..753870ff496 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h @@ -46,6 +46,7 @@ #include "mongo/db/storage/recovery_unit.h" #include "rocks_transaction.h" +#include "rocks_counter_manager.h" namespace rocksdb { class DB; @@ -64,7 +65,8 @@ namespace mongo { class RocksRecoveryUnit : public RecoveryUnit { MONGO_DISALLOW_COPYING(RocksRecoveryUnit); public: - RocksRecoveryUnit(RocksTransactionEngine* transactionEngine, rocksdb::DB* db, bool durable); + RocksRecoveryUnit(RocksTransactionEngine* transactionEngine, rocksdb::DB* db, + RocksCounterManager* counterManager, bool durable); virtual ~RocksRecoveryUnit(); virtual void beginUnitOfWork(OperationContext* opCtx); @@ -104,13 +106,11 @@ namespace mongo { long long getDeltaCounter(const rocksdb::Slice& counterKey); - static long long getCounterValue(rocksdb::DB* db, const rocksdb::Slice counterKey); - void setOplogReadTill(const RecordId& loc); RecordId getOplogReadTill() const { return _oplogReadTill; } RocksRecoveryUnit* newRocksRecoveryUnit() { - return new RocksRecoveryUnit(_transactionEngine, _db, _durable); + return new RocksRecoveryUnit(_transactionEngine, _db, _counterManager, _durable); } struct Counter { @@ -126,8 +126,6 @@ namespace mongo { static int getTotalLiveRecoveryUnits() { return _totalLiveRecoveryUnits.load(); } - static rocksdb::Slice encodeCounter(long long counter, int64_t* storage); - private: void _releaseSnapshot(); @@ -135,7 +133,8 @@ namespace mongo { void _abort(); RocksTransactionEngine* _transactionEngine; // not owned - rocksdb::DB* _db; // not owned + rocksdb::DB* _db; // not owned + RocksCounterManager* _counterManager; // not owned const bool _durable; |