summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Canadi <icanadi@fb.com>2015-03-24 12:22:33 -0700
committerRamon Fernandez <ramon.fernandez@mongodb.com>2015-04-20 16:45:32 -0400
commita966cea74fc5454f4a03691ffe59ab62162c88f7 (patch)
tree0245f9fd63c808f59b6933ea56af99e4bea939e2
parentd12a68e550e4f928b8c8f9d052062b4072b236ef (diff)
downloadmongo-a966cea74fc5454f4a03691ffe59ab62162c88f7.tar.gz
SERVER-17939 Backport mongo-rocks updates to v3.0 branch
* Expose RocksDB's CompactRange() API * Implement not-crash safe counters in Mongo-rocks * Implement backup for Mongo+Rocks * Fail to start server with invalid rocksdbConfigString * Check iterator status only if it's not Valid * Switch call to DB::CompactRange to SuggestCompactRange() which is more light weight We have been testing those changes in Parse's environment and have good confidence of their stability. Signed-off-by: Ramon Fernandez <ramon.fernandez@mongodb.com>
-rw-r--r--src/mongo/db/storage/rocks/SConscript1
-rw-r--r--src/mongo/db/storage/rocks/rocks_counter_manager.cpp119
-rw-r--r--src/mongo/db/storage/rocks/rocks_counter_manager.h77
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.cpp33
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.h8
-rw-r--r--src/mongo/db/storage/rocks/rocks_global_options.cpp14
-rw-r--r--src/mongo/db/storage/rocks/rocks_global_options.h8
-rw-r--r--src/mongo/db/storage/rocks/rocks_index.cpp15
-rw-r--r--src/mongo/db/storage/rocks/rocks_index_test.cpp4
-rw-r--r--src/mongo/db/storage/rocks/rocks_init.cpp2
-rw-r--r--src/mongo/db/storage/rocks/rocks_parameters.cpp43
-rw-r--r--src/mongo/db/storage/rocks/rocks_parameters.h35
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.cpp61
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.h14
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store_test.cpp11
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.cpp32
-rw-r--r--src/mongo/db/storage/rocks/rocks_recovery_unit.h13
17 files changed, 420 insertions, 70 deletions
diff --git a/src/mongo/db/storage/rocks/SConscript b/src/mongo/db/storage/rocks/SConscript
index 2ba0f3149d3..ce7e31d0856 100644
--- a/src/mongo/db/storage/rocks/SConscript
+++ b/src/mongo/db/storage/rocks/SConscript
@@ -6,6 +6,7 @@ if has_option("rocksdb"):
env.Library(
target= 'storage_rocks_base',
source= [
+ 'rocks_counter_manager.cpp',
'rocks_global_options.cpp',
'rocks_engine.cpp',
'rocks_record_store.cpp',
diff --git a/src/mongo/db/storage/rocks/rocks_counter_manager.cpp b/src/mongo/db/storage/rocks/rocks_counter_manager.cpp
new file mode 100644
index 00000000000..38773db3163
--- /dev/null
+++ b/src/mongo/db/storage/rocks/rocks_counter_manager.cpp
@@ -0,0 +1,119 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+#include "mongo/platform/endian.h"
+
+#include "rocks_counter_manager.h"
+
+#include <atomic>
+#include <map>
+#include <memory>
+#include <string>
+
+// for invariant()
+#include "mongo/util/assert_util.h"
+
+#include <rocksdb/db.h>
+
+#include "rocks_util.h"
+
+namespace mongo {
+
+ long long RocksCounterManager::loadCounter(const std::string& counterKey) {
+ {
+ boost::mutex::scoped_lock lk(_lock);
+ auto itr = _counters.find(counterKey);
+ if (itr != _counters.end()) {
+ return itr->second;
+ }
+ }
+ std::string value;
+ auto s = _db->Get(rocksdb::ReadOptions(), counterKey, &value);
+ if (s.IsNotFound()) {
+ return 0;
+ }
+ invariantRocksOK(s);
+
+ int64_t ret;
+ invariant(sizeof(ret) == value.size());
+ memcpy(&ret, value.data(), sizeof(ret));
+ // we store counters in little endian
+ return static_cast<long long>(endian::littleToNative(ret));
+ }
+
+ void RocksCounterManager::updateCounter(const std::string& counterKey, long long count,
+ rocksdb::WriteBatch* writeBatch) {
+
+ if (_crashSafe) {
+ int64_t storage;
+ writeBatch->Put(counterKey, _encodeCounter(count, &storage));
+ } else {
+ boost::mutex::scoped_lock lk(_lock);
+ _counters[counterKey] = count;
+ ++_syncCounter;
+ if (!_syncing && _syncCounter >= kSyncEvery) {
+ // let's sync this now. piggyback on writeBatch
+ int64_t storage;
+ for (const auto& counter : _counters) {
+ writeBatch->Put(counter.first, _encodeCounter(counter.second, &storage));
+ }
+ _counters.clear();
+ _syncCounter = 0;
+ }
+ }
+ }
+
+ void RocksCounterManager::sync() {
+ rocksdb::WriteBatch wb;
+ {
+ boost::mutex::scoped_lock lk(_lock);
+ if (_syncing || _counters.size() == 0) {
+ return;
+ }
+ int64_t storage;
+ for (const auto& counter : _counters) {
+ wb.Put(counter.first, _encodeCounter(counter.second, &storage));
+ }
+ _counters.clear();
+ _syncCounter = 0;
+ _syncing = true;
+ }
+ auto s = _db->Write(rocksdb::WriteOptions(), &wb);
+ invariantRocksOK(s);
+ {
+ boost::mutex::scoped_lock lk(_lock);
+ _syncing = false;
+ }
+ }
+
+ rocksdb::Slice RocksCounterManager::_encodeCounter(long long counter, int64_t* storage) {
+ *storage = static_cast<int64_t>(endian::littleToNative(counter));
+ return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage));
+ }
+}
diff --git a/src/mongo/db/storage/rocks/rocks_counter_manager.h b/src/mongo/db/storage/rocks/rocks_counter_manager.h
new file mode 100644
index 00000000000..e88fb3da300
--- /dev/null
+++ b/src/mongo/db/storage/rocks/rocks_counter_manager.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <set>
+#include <unordered_map>
+#include <memory>
+#include <string>
+#include <list>
+
+#include <boost/thread/mutex.hpp>
+
+#include <rocksdb/db.h>
+#include <rocksdb/slice.h>
+
+#include "mongo/base/string_data.h"
+
+namespace mongo {
+
+ class RocksCounterManager {
+ public:
+ RocksCounterManager(rocksdb::DB* db, bool crashSafe)
+ : _db(db), _crashSafe(crashSafe), _syncing(false), _syncCounter(0) {}
+
+ long long loadCounter(const std::string& counterKey);
+
+ void updateCounter(const std::string& counterKey, long long count,
+ rocksdb::WriteBatch* writeBatch);
+
+ void sync();
+
+ bool crashSafe() const { return _crashSafe; }
+
+ private:
+ static rocksdb::Slice _encodeCounter(long long counter, int64_t* storage);
+
+ rocksdb::DB* _db; // not owned
+ const bool _crashSafe;
+ boost::mutex _lock;
+ // protected by _lock
+ bool _syncing;
+ // protected by _lock
+ std::unordered_map<std::string, long long> _counters;
+ // protected by _lock
+ int _syncCounter;
+
+ static const int kSyncEvery = 10000;
+ };
+
+}
diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp
index e16447b99f0..840900c7a31 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.cpp
+++ b/src/mongo/db/storage/rocks/rocks_engine.cpp
@@ -49,6 +49,7 @@
#include <rocksdb/utilities/convenience.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/utilities/write_batch_with_index.h>
+#include <rocksdb/utilities/checkpoint.h>
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/index/index_descriptor.h"
@@ -59,6 +60,7 @@
#include "mongo/util/log.h"
#include "mongo/util/processinfo.h"
+#include "rocks_counter_manager.h"
#include "rocks_global_options.h"
#include "rocks_record_store.h"
#include "rocks_recovery_unit.h"
@@ -175,6 +177,9 @@ namespace mongo {
invariantRocksOK(s);
_db.reset(db);
+ _counterManager.reset(
+ new RocksCounterManager(_db.get(), rocksGlobalOptions.crashSafeCounters));
+
// open iterator
boost::scoped_ptr<rocksdb::Iterator> iter(_db->NewIterator(rocksdb::ReadOptions()));
@@ -246,10 +251,11 @@ namespace mongo {
}
}
- RocksEngine::~RocksEngine() {}
+ RocksEngine::~RocksEngine() { cleanShutdown(); }
RecoveryUnit* RocksEngine::newRecoveryUnit() {
- return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _durable);
+ return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _counterManager.get(),
+ _durable);
}
Status RocksEngine::createRecordStore(OperationContext* opCtx, const StringData& ns,
@@ -275,11 +281,12 @@ namespace mongo {
}
if (options.capped) {
return new RocksRecordStore(
- ns, ident, _db.get(), _getIdentPrefix(ident), true,
+ ns, ident, _db.get(), _counterManager.get(), _getIdentPrefix(ident), true,
options.cappedSize ? options.cappedSize : 4096, // default size
options.cappedMaxDocs ? options.cappedMaxDocs : -1);
} else {
- return new RocksRecordStore(ns, ident, _db.get(), _getIdentPrefix(ident));
+ return new RocksRecordStore(ns, ident, _db.get(), _counterManager.get(),
+ _getIdentPrefix(ident));
}
}
@@ -361,6 +368,8 @@ namespace mongo {
return indents;
}
+ void RocksEngine::cleanShutdown() { _counterManager->sync(); }
+
int64_t RocksEngine::getIdentSize(OperationContext* opCtx, const StringData& ident) {
uint64_t storageSize;
std::string prefix = _getIdentPrefix(ident);
@@ -375,6 +384,16 @@ namespace mongo {
_rateLimiter->SetBytesPerSecond(static_cast<int64_t>(_maxWriteMBPerSec) * 1024 * 1024);
}
+ Status RocksEngine::backup(const std::string& path) {
+ rocksdb::Checkpoint* checkpoint;
+ auto s = rocksdb::Checkpoint::Create(_db.get(), &checkpoint);
+ if (s.ok()) {
+ s = checkpoint->CreateCheckpoint(path);
+ }
+ delete checkpoint;
+ return rocksToMongoStatus(s);
+ }
+
std::unordered_set<uint32_t> RocksEngine::getDroppedPrefixes() const {
boost::mutex::scoped_lock lk(_droppedPrefixesMutex);
// this will copy the set. that way compaction filter has its own copy and doesn't need to
@@ -454,7 +473,11 @@ namespace mongo {
// allow override
if (!rocksGlobalOptions.configString.empty()) {
rocksdb::Options base_options(options);
- rocksdb::GetOptionsFromString(base_options, rocksGlobalOptions.configString, &options);
+ auto s = rocksdb::GetOptionsFromString(base_options, rocksGlobalOptions.configString, &options);
+ if (!s.ok()) {
+ log() << "Invalid rocksdbConfigString \"" << rocksGlobalOptions.configString << "\"";
+ invariantRocksOK(s);
+ }
}
return options;
diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h
index f1a4d2f58da..b87cacd573d 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.h
+++ b/src/mongo/db/storage/rocks/rocks_engine.h
@@ -49,6 +49,7 @@
#include "mongo/db/storage/kv/kv_engine.h"
#include "mongo/util/string_map.h"
+#include "rocks_counter_manager.h"
#include "rocks_transaction.h"
namespace rocksdb {
@@ -113,7 +114,7 @@ namespace mongo {
return Status::OK();
}
- virtual void cleanShutdown() {}
+ virtual void cleanShutdown();
/**
* Initializes a background job to remove excess documents in the oplog collections.
@@ -135,6 +136,8 @@ namespace mongo {
int getMaxWriteMBPerSec() const { return _maxWriteMBPerSec; }
void setMaxWriteMBPerSec(int maxWriteMBPerSec);
+ Status backup(const std::string& path);
+
private:
Status _createIdentPrefix(const StringData& ident);
std::string _getIdentPrefix(const StringData& ident);
@@ -165,6 +168,9 @@ namespace mongo {
// This is for concurrency control
RocksTransactionEngine _transactionEngine;
+ // CounterManages manages counters like numRecords and dataSize for record stores
+ boost::scoped_ptr<RocksCounterManager> _counterManager;
+
static const std::string kMetadataPrefix;
static const std::string kDroppedPrefix;
};
diff --git a/src/mongo/db/storage/rocks/rocks_global_options.cpp b/src/mongo/db/storage/rocks/rocks_global_options.cpp
index 264b17805f7..ae7dd8f1a8c 100644
--- a/src/mongo/db/storage/rocks/rocks_global_options.cpp
+++ b/src/mongo/db/storage/rocks/rocks_global_options.cpp
@@ -65,6 +65,15 @@ namespace mongo {
moe::String,
"RocksDB storage engine custom "
"configuration settings").hidden();
+ rocksOptions.addOptionChaining("storage.rocksdb.crashSafeCounters",
+ "rocksdbCrashSafeCounters", moe::Bool,
+ "If true, numRecord and dataSize counter will be consistent "
+ "even after power failure. If false, numRecord and dataSize "
+ "might be a bit inconsistent after power failure, but "
+ "should be correct under normal conditions. Setting this to "
+ "true will make database inserts a bit slower.")
+ .setDefault(moe::Value(false))
+ .hidden();
return options->addSection(rocksOptions);
}
@@ -90,6 +99,11 @@ namespace mongo {
params["storage.rocksdb.configString"].as<std::string>();
log() << "Engine custom option: " << rocksGlobalOptions.configString;
}
+ if (params.count("storage.rocksdb.crashSafeCounters")) {
+ rocksGlobalOptions.crashSafeCounters =
+ params["storage.rocksdb.crashSafeCounters"].as<bool>();
+ log() << "Crash safe counters: " << rocksGlobalOptions.crashSafeCounters;
+ }
return Status::OK();
}
diff --git a/src/mongo/db/storage/rocks/rocks_global_options.h b/src/mongo/db/storage/rocks/rocks_global_options.h
index aa8d103df33..d8efc51a556 100644
--- a/src/mongo/db/storage/rocks/rocks_global_options.h
+++ b/src/mongo/db/storage/rocks/rocks_global_options.h
@@ -37,7 +37,11 @@ namespace mongo {
class RocksGlobalOptions {
public:
- RocksGlobalOptions() : cacheSizeGB(0), maxWriteMBPerSec(1024), compression("snappy") {}
+ RocksGlobalOptions()
+ : cacheSizeGB(0),
+ maxWriteMBPerSec(1024),
+ compression("snappy"),
+ crashSafeCounters(false) {}
Status add(moe::OptionSection* options);
Status store(const moe::Environment& params, const std::vector<std::string>& args);
@@ -47,6 +51,8 @@ namespace mongo {
std::string compression;
std::string configString;
+
+ bool crashSafeCounters;
};
extern RocksGlobalOptions rocksGlobalOptions;
diff --git a/src/mongo/db/storage/rocks/rocks_index.cpp b/src/mongo/db/storage/rocks/rocks_index.cpp
index e08b6395c9c..a91b17efea2 100644
--- a/src/mongo/db/storage/rocks/rocks_index.cpp
+++ b/src/mongo/db/storage/rocks/rocks_index.cpp
@@ -112,7 +112,6 @@ namespace mongo {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
_iterator.reset(ru->NewIterator(_prefix));
_currentSequenceNumber = ru->snapshot()->GetSequenceNumber();
- invariantRocksOK(_iterator->status());
}
int getDirection() const { return _forward ? 1 : -1; }
@@ -234,7 +233,6 @@ namespace mongo {
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
if (_currentSequenceNumber != ru->snapshot()->GetSequenceNumber()) {
_iterator.reset(ru->NewIterator(_prefix));
- invariantRocksOK(_iterator->status());
_currentSequenceNumber = ru->snapshot()->GetSequenceNumber();
if (!_savedEOF) {
@@ -260,7 +258,9 @@ namespace mongo {
} else {
_iterator->Prev();
}
- invariantRocksOK(_iterator->status());
+ if (!_iterator->Valid()) {
+ invariantRocksOK(_iterator->status());
+ }
}
// Seeks to query. Returns true on exact match.
@@ -268,11 +268,14 @@ namespace mongo {
const rocksdb::Slice keySlice(query.getBuffer(), query.getSize());
_iterator->Seek(keySlice);
if (!_iterator->Valid()) {
+ invariantRocksOK(_iterator->status());
if (!_forward) {
// this will give lower bound behavior for backwards
_iterator->SeekToLast();
+ if (!_iterator->Valid()) {
+ invariantRocksOK(_iterator->status());
+ }
}
- invariantRocksOK(_iterator->status());
return false;
}
@@ -288,7 +291,9 @@ namespace mongo {
// were
// searching for.
_iterator->Prev();
- invariantRocksOK(_iterator->status());
+ if (!_iterator->Valid()) {
+ invariantRocksOK(_iterator->status());
+ }
}
return false;
diff --git a/src/mongo/db/storage/rocks/rocks_index_test.cpp b/src/mongo/db/storage/rocks/rocks_index_test.cpp
index b6f865fbaf1..3007cb9fb62 100644
--- a/src/mongo/db/storage/rocks/rocks_index_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_index_test.cpp
@@ -64,6 +64,7 @@ namespace mongo {
auto s = rocksdb::DB::Open(options, _tempDir.path(), &db);
ASSERT(s.ok());
_db.reset(db);
+ _counterManager.reset(new RocksCounterManager(_db.get(), true));
}
virtual SortedDataInterface* newSortedDataInterface(bool unique) {
@@ -75,7 +76,7 @@ namespace mongo {
}
virtual RecoveryUnit* newRecoveryUnit() {
- return new RocksRecoveryUnit(&_transactionEngine, _db.get(), true);
+ return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _counterManager.get(), true);
}
private:
@@ -84,6 +85,7 @@ namespace mongo {
unittest::TempDir _tempDir;
scoped_ptr<rocksdb::DB> _db;
RocksTransactionEngine _transactionEngine;
+ scoped_ptr<RocksCounterManager> _counterManager;
};
HarnessHelper* newHarnessHelper() { return new RocksIndexHarness(); }
diff --git a/src/mongo/db/storage/rocks/rocks_init.cpp b/src/mongo/db/storage/rocks/rocks_init.cpp
index 18d8b9d08c4..3a438ed263e 100644
--- a/src/mongo/db/storage/rocks/rocks_init.cpp
+++ b/src/mongo/db/storage/rocks/rocks_init.cpp
@@ -59,6 +59,8 @@ namespace mongo {
// Intentionally leaked.
auto leaked __attribute__((unused)) = new RocksServerStatusSection(engine);
auto leaked2 __attribute__((unused)) = new RocksRateLimiterServerParameter(engine);
+ auto leaked3 __attribute__((unused)) = new RocksBackupServerParameter(engine);
+ auto leaked4 __attribute__((unused)) = new RocksCompactServerParameter(engine);
return new KVStorageEngine(engine, options);
}
diff --git a/src/mongo/db/storage/rocks/rocks_parameters.cpp b/src/mongo/db/storage/rocks/rocks_parameters.cpp
index 4181d8aab2f..c92a6869864 100644
--- a/src/mongo/db/storage/rocks/rocks_parameters.cpp
+++ b/src/mongo/db/storage/rocks/rocks_parameters.cpp
@@ -30,11 +30,15 @@
#include "mongo/platform/basic.h"
#include "rocks_parameters.h"
+#include "rocks_util.h"
#include "mongo/logger/parse_log_component_settings.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include <rocksdb/db.h>
+#include <rocksdb/experimental.h>
+
namespace mongo {
RocksRateLimiterServerParameter::RocksRateLimiterServerParameter(RocksEngine* engine)
@@ -70,4 +74,43 @@ namespace mongo {
return Status::OK();
}
+
+ RocksBackupServerParameter::RocksBackupServerParameter(RocksEngine* engine)
+ : ServerParameter(ServerParameterSet::getGlobal(), "rocksdbBackup", false, true),
+ _engine(engine) {}
+
+ void RocksBackupServerParameter::append(OperationContext* txn, BSONObjBuilder& b,
+ const std::string& name) {
+ b.append(name, "");
+ }
+
+ Status RocksBackupServerParameter::set(const BSONElement& newValueElement) {
+ auto str = newValueElement.str();
+ if (str.size() == 0) {
+ return Status(ErrorCodes::BadValue, str::stream() << name() << " has to be a string");
+ }
+ return setFromString(str);
+ }
+
+ Status RocksBackupServerParameter::setFromString(const std::string& str) {
+ return _engine->backup(str);
+ }
+
+ RocksCompactServerParameter::RocksCompactServerParameter(RocksEngine* engine)
+ : ServerParameter(ServerParameterSet::getGlobal(), "rocksdbCompact", false, true),
+ _engine(engine) {}
+
+ void RocksCompactServerParameter::append(OperationContext* txn, BSONObjBuilder& b,
+ const std::string& name) {
+ b.append(name, "");
+ }
+
+ Status RocksCompactServerParameter::set(const BSONElement& newValueElement) {
+ return setFromString("");
+ }
+
+ Status RocksCompactServerParameter::setFromString(const std::string& str) {
+ auto s = rocksdb::experimental::SuggestCompactRange(_engine->getDB(), nullptr, nullptr);
+ return rocksToMongoStatus(s);
+ }
}
diff --git a/src/mongo/db/storage/rocks/rocks_parameters.h b/src/mongo/db/storage/rocks/rocks_parameters.h
index b447a786386..295c8eb9cb6 100644
--- a/src/mongo/db/storage/rocks/rocks_parameters.h
+++ b/src/mongo/db/storage/rocks/rocks_parameters.h
@@ -33,6 +33,8 @@
namespace mongo {
+ // To dynamically configure RocksDB's rate limit, run
+ // db.adminCommand({setParameter:1, rocksdbRuntimeConfigMaxWriteMBPerSec:30})
class RocksRateLimiterServerParameter : public ServerParameter {
MONGO_DISALLOW_COPYING(RocksRateLimiterServerParameter);
@@ -47,4 +49,37 @@ namespace mongo {
RocksEngine* _engine;
};
+ // We use mongo's setParameter() API to issue a backup request to rocksdb.
+ // To backup entire RocksDB instance, call:
+ // db.adminCommand({setParameter:1, rocksdbBackup: "/var/lib/mongodb/backup/1"})
+ // The directory needs to be an absolute path. It should not exist -- it will be created
+ // automatically.
+ class RocksBackupServerParameter : public ServerParameter {
+ MONGO_DISALLOW_COPYING(RocksBackupServerParameter);
+
+ public:
+ RocksBackupServerParameter(RocksEngine* engine);
+ virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name);
+ virtual Status set(const BSONElement& newValueElement);
+ virtual Status setFromString(const std::string& str);
+
+ private:
+ RocksEngine* _engine;
+ };
+
+ // We use mongo's setParameter() API to issue a compact request to rocksdb.
+ // To compact entire RocksDB instance, call:
+ // db.adminCommand({setParameter:1, rocksdbCompact: 1})
+ class RocksCompactServerParameter : public ServerParameter {
+ MONGO_DISALLOW_COPYING(RocksCompactServerParameter);
+
+ public:
+ RocksCompactServerParameter(RocksEngine* engine);
+ virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name);
+ virtual Status set(const BSONElement& newValueElement);
+ virtual Status setFromString(const std::string& str);
+
+ private:
+ RocksEngine* _engine;
+ };
}
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp
index 810b7b08f79..5b500e7a248 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp
@@ -56,6 +56,7 @@
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/timer.h"
+#include "rocks_counter_manager.h"
#include "rocks_engine.h"
#include "rocks_recovery_unit.h"
#include "rocks_util.h"
@@ -169,13 +170,13 @@ namespace mongo {
std::string _prefix;
};
- RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id,
- rocksdb::DB* db, // not owned here
- std::string prefix, bool isCapped, int64_t cappedMaxSize,
- int64_t cappedMaxDocs,
+ RocksRecordStore::RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db,
+ RocksCounterManager* counterManager, std::string prefix,
+ bool isCapped, int64_t cappedMaxSize, int64_t cappedMaxDocs,
CappedDocumentDeleteCallback* cappedDeleteCallback)
: RecordStore(ns),
_db(db),
+ _counterManager(counterManager),
_prefix(std::move(prefix)),
_isCapped(isCapped),
_cappedMaxSize(cappedMaxSize),
@@ -184,8 +185,9 @@ namespace mongo {
_cappedDeleteCallback(cappedDeleteCallback),
_cappedDeleteCheckCount(0),
_isOplog(NamespaceString::oplog(ns)),
- _oplogKeyTracker(
- _isOplog ? new RocksOplogKeyTracker(std::move(rocksGetNextPrefix(_prefix))) : nullptr),
+ _oplogKeyTracker(_isOplog
+ ? new RocksOplogKeyTracker(std::move(rocksGetNextPrefix(_prefix)))
+ : nullptr),
_oplogNextToDelete(0),
_cappedVisibilityManager((_isCapped || _isOplog) ? new CappedVisibilityManager()
: nullptr),
@@ -207,6 +209,7 @@ namespace mongo {
boost::scoped_ptr<rocksdb::Iterator> iter(
RocksRecoveryUnit::NewIteratorNoSnapshot(_db, _prefix));
iter->SeekToLast();
+ bool emptyCollection = !iter->Valid();
if (iter->Valid()) {
rocksdb::Slice lastSlice = iter->key();
RecordId lastId = _makeRecordId(lastSlice);
@@ -220,11 +223,34 @@ namespace mongo {
}
// load metadata
- _numRecords.store(RocksRecoveryUnit::getCounterValue(_db, _numRecordsKey));
- _dataSize.store(RocksRecoveryUnit::getCounterValue(_db, _dataSizeKey));
+ _numRecords.store(_counterManager->loadCounter(_numRecordsKey));
+ _dataSize.store(_counterManager->loadCounter(_dataSizeKey));
invariant(_dataSize.load() >= 0);
invariant(_numRecords.load() >= 0);
+ if (!emptyCollection && !_counterManager->crashSafe() &&
+ _numRecords.load() < kCollectionScanOnCreationThreshold) {
+ LOG(1) << "doing scan of collection " << ns << " to get info";
+
+ _numRecords.store(0);
+ _dataSize.store(0);
+
+ long long numRecords = 0, dataSize = 0;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ numRecords++;
+ dataSize += static_cast<long long>(iter->value().size());
+ }
+
+ _numRecords.store(numRecords);
+ _dataSize.store(dataSize);
+
+ rocksdb::WriteBatch wb;
+ _counterManager->updateCounter(_numRecordsKey, numRecords, &wb);
+ _counterManager->updateCounter(_dataSizeKey, dataSize, &wb);
+ auto s = _db->Write(rocksdb::WriteOptions(), &wb);
+ invariantRocksOK(s);
+ }
+
_hasBackgroundThread = RocksEngine::initRsOplogBackgroundThread(ns);
}
@@ -699,11 +725,12 @@ namespace mongo {
_numRecords.store(numRecords);
_dataSize.store(dataSize);
rocksdb::WriteBatch wb;
- int64_t storage;
- wb.Put(_numRecordsKey, RocksRecoveryUnit::encodeCounter(numRecords, &storage));
- wb.Put(_dataSizeKey, RocksRecoveryUnit::encodeCounter(dataSize, &storage));
- auto s = _db->Write(rocksdb::WriteOptions(), &wb);
- invariantRocksOK(s);
+ _counterManager->updateCounter(_numRecordsKey, numRecords, &wb);
+ _counterManager->updateCounter(_dataSizeKey, dataSize, &wb);
+ if (wb.Count() > 0) {
+ auto s = _db->Write(rocksdb::WriteOptions(), &wb);
+ invariantRocksOK(s);
+ }
}
/**
@@ -876,8 +903,6 @@ namespace mongo {
else
_iterator->Prev();
- invariantRocksOK(_iterator->status());
-
if (_iterator->Valid()) {
_curr = _decodeCurr();
if (_cappedVisibilityManager.get()) { // isCapped?
@@ -896,6 +921,7 @@ namespace mongo {
}
} // isCapped?
} else {
+ invariantRocksOK(_iterator->status());
_eof = true;
// we leave _curr as it is on purpose
}
@@ -965,7 +991,6 @@ namespace mongo {
int64_t locStorage;
_iterator->Seek(RocksRecordStore::_makeKey(loc, &locStorage));
}
- invariantRocksOK(_iterator->status());
} else { // backward iterator
if (loc.isNull()) {
_iterator->SeekToLast();
@@ -973,17 +998,17 @@ namespace mongo {
// lower bound on reverse iterator
int64_t locStorage;
_iterator->Seek(RocksRecordStore::_makeKey(loc, &locStorage));
- invariantRocksOK(_iterator->status());
if (!_iterator->Valid()) {
+ invariantRocksOK(_iterator->status());
_iterator->SeekToLast();
} else if (_decodeCurr() != loc) {
_iterator->Prev();
}
}
- invariantRocksOK(_iterator->status());
}
_eof = !_iterator->Valid();
if (_eof) {
+ invariantRocksOK(_iterator->status());
_curr = loc;
} else {
_curr = _decodeCurr();
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h
index 1257ffe9c27..10fba514102 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.h
+++ b/src/mongo/db/storage/rocks/rocks_record_store.h
@@ -51,6 +51,7 @@ namespace rocksdb {
namespace mongo {
+ class RocksCounterManager;
class CappedVisibilityManager {
public:
CappedVisibilityManager() : _oplog_highestSeen(RecordId::min()) {}
@@ -79,7 +80,8 @@ namespace mongo {
class RocksRecordStore : public RecordStore {
public:
- RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db, std::string prefix,
+ RocksRecordStore(const StringData& ns, const StringData& id, rocksdb::DB* db,
+ RocksCounterManager* counterManager, std::string prefix,
bool isCapped = false, int64_t cappedMaxSize = -1,
int64_t cappedMaxDocs = -1,
CappedDocumentDeleteCallback* cappedDeleteCallback = NULL);
@@ -235,7 +237,8 @@ namespace mongo {
void _changeNumRecords(OperationContext* txn, int64_t amount);
void _increaseDataSize(OperationContext* txn, int64_t amount);
- rocksdb::DB* _db; // not owned
+ rocksdb::DB* _db; // not owned
+ RocksCounterManager* _counterManager; // not owned
std::string _prefix;
const bool _isCapped;
@@ -268,5 +271,12 @@ namespace mongo {
bool _shuttingDown;
bool _hasBackgroundThread;
+
+ /**
+ * During record store creation, if a record count is under
+ * 'kCollectionScanOnCreationThreshold', perform a collection scan to update the
+ * number of records and data size counters
+ */
+ static const long long kCollectionScanOnCreationThreshold = 10000;
};
}
diff --git a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
index d4ed147e94d..828a5115efc 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store_test.cpp
@@ -64,23 +64,25 @@ namespace mongo {
auto s = rocksdb::DB::Open(options, _tempDir.path(), &db);
ASSERT(s.ok());
_db.reset(db);
+ _counterManager.reset(new RocksCounterManager(_db.get(), true));
}
virtual RecordStore* newNonCappedRecordStore() {
return newNonCappedRecordStore("foo.bar");
}
RecordStore* newNonCappedRecordStore(const std::string& ns) {
- return new RocksRecordStore(ns, "1", _db.get(), "prefix");
+ return new RocksRecordStore(ns, "1", _db.get(), _counterManager.get(), "prefix");
}
RecordStore* newCappedRecordStore(const std::string& ns, int64_t cappedMaxSize,
int64_t cappedMaxDocs) {
- return new RocksRecordStore(ns, "1", _db.get(), "prefix", true, cappedMaxSize,
- cappedMaxDocs);
+ return new RocksRecordStore(ns, "1", _db.get(), _counterManager.get(), "prefix", true,
+ cappedMaxSize, cappedMaxDocs);
}
virtual RecoveryUnit* newRecoveryUnit() {
- return new RocksRecoveryUnit(&_transactionEngine, _db.get(), true);
+ return new RocksRecoveryUnit(&_transactionEngine, _db.get(), _counterManager.get(),
+ true);
}
private:
@@ -88,6 +90,7 @@ namespace mongo {
unittest::TempDir _tempDir;
boost::scoped_ptr<rocksdb::DB> _db;
RocksTransactionEngine _transactionEngine;
+ scoped_ptr<RocksCounterManager> _counterManager;
};
HarnessHelper* newHarnessHelper() { return new RocksRecordStoreHarnessHelper(); }
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
index 2ec2f592df1..da1ab9798a7 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp
@@ -29,7 +29,6 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
#include "mongo/platform/basic.h"
-#include "mongo/platform/endian.h"
#include "rocks_recovery_unit.h"
@@ -104,9 +103,10 @@ namespace mongo {
std::atomic<int> RocksRecoveryUnit::_totalLiveRecoveryUnits(0);
RocksRecoveryUnit::RocksRecoveryUnit(RocksTransactionEngine* transactionEngine, rocksdb::DB* db,
- bool durable)
+ RocksCounterManager* counterManager, bool durable)
: _transactionEngine(transactionEngine),
_db(db),
+ _counterManager(counterManager),
_durable(durable),
_transaction(transactionEngine),
_writeBatch(),
@@ -194,20 +194,20 @@ namespace mongo {
void RocksRecoveryUnit::_commit() {
invariant(_writeBatch);
+ rocksdb::WriteBatch* wb = _writeBatch->GetWriteBatch();
for (auto pair : _deltaCounters) {
auto& counter = pair.second;
counter._value->fetch_add(counter._delta, std::memory_order::memory_order_relaxed);
long long newValue = counter._value->load(std::memory_order::memory_order_relaxed);
- int64_t storage;
- writeBatch()->Put(pair.first, encodeCounter(newValue, &storage));
+ _counterManager->updateCounter(pair.first, newValue, wb);
}
- if (_writeBatch->GetWriteBatch()->Count() != 0) {
+ if (wb->Count() != 0) {
// Order of operations here is important. It needs to be synchronized with
// _transaction.recordSnapshotId() and _db->GetSnapshot() and
rocksdb::WriteOptions writeOptions;
writeOptions.disableWAL = !_durable;
- auto status = _db->Write(rocksdb::WriteOptions(), _writeBatch->GetWriteBatch());
+ auto status = _db->Write(rocksdb::WriteOptions(), wb);
invariantRocksOK(status);
_transaction.commit();
}
@@ -297,27 +297,7 @@ namespace mongo {
}
}
- long long RocksRecoveryUnit::getCounterValue(rocksdb::DB* db, const rocksdb::Slice counterKey) {
- std::string value;
- auto s = db->Get(rocksdb::ReadOptions(), counterKey, &value);
- if (s.IsNotFound()) {
- return 0;
- }
- invariantRocksOK(s);
-
- int64_t ret;
- invariant(sizeof(ret) == value.size());
- memcpy(&ret, value.data(), sizeof(ret));
- // we store counters in little endian
- return static_cast<long long>(endian::littleToNative(ret));
- }
-
RocksRecoveryUnit* RocksRecoveryUnit::getRocksRecoveryUnit(OperationContext* opCtx) {
return checked_cast<RocksRecoveryUnit*>(opCtx->recoveryUnit());
}
-
- rocksdb::Slice RocksRecoveryUnit::encodeCounter(long long counter, int64_t* storage) {
- *storage = static_cast<int64_t>(endian::littleToNative(counter));
- return rocksdb::Slice(reinterpret_cast<const char*>(storage), sizeof(*storage));
- }
}
diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h
index d2834dcd0d8..753870ff496 100644
--- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h
+++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h
@@ -46,6 +46,7 @@
#include "mongo/db/storage/recovery_unit.h"
#include "rocks_transaction.h"
+#include "rocks_counter_manager.h"
namespace rocksdb {
class DB;
@@ -64,7 +65,8 @@ namespace mongo {
class RocksRecoveryUnit : public RecoveryUnit {
MONGO_DISALLOW_COPYING(RocksRecoveryUnit);
public:
- RocksRecoveryUnit(RocksTransactionEngine* transactionEngine, rocksdb::DB* db, bool durable);
+ RocksRecoveryUnit(RocksTransactionEngine* transactionEngine, rocksdb::DB* db,
+ RocksCounterManager* counterManager, bool durable);
virtual ~RocksRecoveryUnit();
virtual void beginUnitOfWork(OperationContext* opCtx);
@@ -104,13 +106,11 @@ namespace mongo {
long long getDeltaCounter(const rocksdb::Slice& counterKey);
- static long long getCounterValue(rocksdb::DB* db, const rocksdb::Slice counterKey);
-
void setOplogReadTill(const RecordId& loc);
RecordId getOplogReadTill() const { return _oplogReadTill; }
RocksRecoveryUnit* newRocksRecoveryUnit() {
- return new RocksRecoveryUnit(_transactionEngine, _db, _durable);
+ return new RocksRecoveryUnit(_transactionEngine, _db, _counterManager, _durable);
}
struct Counter {
@@ -126,8 +126,6 @@ namespace mongo {
static int getTotalLiveRecoveryUnits() { return _totalLiveRecoveryUnits.load(); }
- static rocksdb::Slice encodeCounter(long long counter, int64_t* storage);
-
private:
void _releaseSnapshot();
@@ -135,7 +133,8 @@ namespace mongo {
void _abort();
RocksTransactionEngine* _transactionEngine; // not owned
- rocksdb::DB* _db; // not owned
+ rocksdb::DB* _db; // not owned
+ RocksCounterManager* _counterManager; // not owned
const bool _durable;