summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Canadi <icanadi@fb.com>2015-02-18 12:40:32 -0800
committerRamon Fernandez <ramon.fernandez@mongodb.com>2015-02-19 14:55:33 -0500
commitadc9db6a803fbe0e2696d994f0eb8e0ae296484d (patch)
treee3ed213ac39584eac5e56ecd4067e35665fe71f6
parent2a59d43f714ac4b4464dab4158165ea818850f5d (diff)
downloadmongo-adc9db6a803fbe0e2696d994f0eb8e0ae296484d.tar.gz
SERVER-17325 [Rocks] Clean up oplog in background thread
Signed-off-by: Ramon Fernandez <ramon.fernandez@mongodb.com> (cherry picked from commit 7d8adb6da8e9727c146acb33b717b57bc406481f)
-rw-r--r--src/mongo/db/storage/rocks/SConscript19
-rw-r--r--src/mongo/db/storage/rocks/rocks_engine.h8
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.cpp155
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.h17
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store_mock.cpp41
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp172
6 files changed, 358 insertions, 54 deletions
diff --git a/src/mongo/db/storage/rocks/SConscript b/src/mongo/db/storage/rocks/SConscript
index c171bd238d2..ec5f6fac153 100644
--- a/src/mongo/db/storage/rocks/SConscript
+++ b/src/mongo/db/storage/rocks/SConscript
@@ -33,7 +33,8 @@ if has_option("rocksdb"):
env.Library(
target= 'storage_rocks',
source= [
- 'rocks_init.cpp'
+ 'rocks_init.cpp',
+ 'rocks_record_store_mongod.cpp',
],
LIBDEPS= [
'storage_rocks_base',
@@ -41,13 +42,23 @@ if has_option("rocksdb"):
]
)
+ env.Library(
+ target= 'storage_rocks_mock',
+ source= [
+ 'rocks_record_store_mock.cpp',
+ ],
+ LIBDEPS= [
+ 'storage_rocks_base',
+ ]
+ )
+
env.CppUnitTest(
target='storage_rocks_index_test',
source=['rocks_index_test.cpp'
],
LIBDEPS=[
- 'storage_rocks_base',
+ 'storage_rocks_mock',
'$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness'
]
)
@@ -58,7 +69,7 @@ if has_option("rocksdb"):
source=['rocks_record_store_test.cpp'
],
LIBDEPS=[
- 'storage_rocks_base',
+ 'storage_rocks_mock',
'$BUILD_DIR/mongo/db/storage/record_store_test_harness'
]
)
@@ -68,7 +79,7 @@ if has_option("rocksdb"):
source=['rocks_engine_test.cpp'
],
LIBDEPS=[
- 'storage_rocks_base',
+ 'storage_rocks_mock',
'$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness'
]
)
diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h
index 4078c13414e..2d9a55a848c 100644
--- a/src/mongo/db/storage/rocks/rocks_engine.h
+++ b/src/mongo/db/storage/rocks/rocks_engine.h
@@ -116,6 +116,14 @@ namespace mongo {
virtual void cleanShutdown() {}
+ /**
+ * Initializes a background job to remove excess documents in the oplog collections.
+ * This applies to the capped collections in the local.oplog.* namespaces (specifically
+ * local.oplog.rs for replica sets and local.oplog.$main for master/slave replication).
+ * Returns true if a background job is running for the namespace.
+ */
+ static bool initRsOplogBackgroundThread(StringData ns);
+
// rocks specific api
rocksdb::DB* getDB() { return _db.get(); }
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp
index 0fc42f6d484..e1b7b5a10fc 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp
@@ -49,9 +49,11 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/rocks/rocks_engine.h"
#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
#include "mongo/db/storage/oplog_hack.h"
#include "mongo/platform/endian.h"
+#include "mongo/util/background.h"
#include "mongo/util/log.h"
#include "mongo/util/timer.h"
@@ -145,6 +147,7 @@ namespace mongo {
_prefix(std::move(prefix)),
_isCapped(isCapped),
_cappedMaxSize(cappedMaxSize),
+ _cappedMaxSizeSlack(std::min(cappedMaxSize / 10, int64_t(16 * 1024 * 1024))),
_cappedMaxDocs(cappedMaxDocs),
_cappedDeleteCallback(cappedDeleteCallback),
_cappedDeleteCheckCount(0),
@@ -154,7 +157,8 @@ namespace mongo {
: nullptr),
_ident(id.toString()),
_dataSizeKey(std::string("\0\0\0\0", 4) + "datasize-" + id.toString()),
- _numRecordsKey(std::string("\0\0\0\0", 4) + "numrecords-" + id.toString()) {
+ _numRecordsKey(std::string("\0\0\0\0", 4) + "numrecords-" + id.toString()),
+ _shuttingDown(false) {
if (_isCapped) {
invariant(_cappedMaxSize > 0);
@@ -186,6 +190,15 @@ namespace mongo {
_dataSize.store(RocksRecoveryUnit::getCounterValue(_db, _dataSizeKey));
invariant(_dataSize.load() >= 0);
invariant(_numRecords.load() >= 0);
+
+ _hasBackgroundThread = RocksEngine::initRsOplogBackgroundThread(ns);
+ }
+
+ RocksRecordStore::~RocksRecordStore() {
+ {
+ boost::timed_mutex::scoped_lock lk(_cappedDeleterMutex);
+ _shuttingDown = true;
+ }
}
int64_t RocksRecordStore::storageSize(OperationContext* txn, BSONObjBuilder* extraInfo,
@@ -214,7 +227,7 @@ namespace mongo {
ru->writeBatch()->Delete(key);
- _changeNumRecords(txn, false);
+ _changeNumRecords(txn, -1);
_increaseDataSize(txn, -oldLength);
}
@@ -237,18 +250,15 @@ namespace mongo {
return false;
}
- void RocksRecordStore::cappedDeleteAsNeeded(OperationContext* txn,
- const RecordId& justInserted) {
+ int64_t RocksRecordStore::cappedDeleteAsNeeded(OperationContext* txn,
+ const RecordId& justInserted) {
if (!_isCapped) {
- return;
+ return 0;
}
// We only want to do the checks occasionally as they are expensive.
// This variable isn't thread safe, but has loose semantics anyway.
- dassert( !_isOplog || _cappedMaxDocs == -1 );
- if ( _cappedMaxDocs == -1 && // Max docs has to be exact, so have to check every time.
- _cappedDeleteCheckCount++ % 100 > 0 )
- return;
+ dassert(!_isOplog || _cappedMaxDocs == -1);
long long dataSizeDelta = 0, numRecordsDelta = 0;
if (!_isOplog) {
@@ -258,70 +268,130 @@ namespace mongo {
}
if (!cappedAndNeedDelete(dataSizeDelta, numRecordsDelta)) {
- return;
+ return 0;
}
// ensure only one thread at a time can do deletes, otherwise they'll conflict.
- boost::mutex::scoped_lock lock(_cappedDeleterMutex, boost::try_to_lock);
- if (!lock) {
- return;
+ boost::timed_mutex::scoped_lock lock(_cappedDeleterMutex, boost::defer_lock);
+
+ if (_cappedMaxDocs != -1) {
+ lock.lock(); // Max docs has to be exact, so have to check every time.
+ }
+ else if(_hasBackgroundThread) {
+ // We are foreground, and there is a background thread,
+
+ // Check if we need some back pressure.
+ if ((_dataSize.load() - _cappedMaxSize) < _cappedMaxSizeSlack) {
+ return 0;
+ }
+
+ // Back pressure needed!
+ // We're not actually going to delete anything, but we're going to syncronize
+ // on the deleter thread.
+ (void)lock.timed_lock(boost::posix_time::millisec(200));
+ return 0;
+ } else {
+ if (!lock.try_lock()) {
+ // Someone else is deleting old records. Apply back-pressure if too far behind,
+ // otherwise continue.
+ if ((_dataSize.load() - _cappedMaxSize) < _cappedMaxSizeSlack)
+ return 0;
+
+ if (!lock.timed_lock(boost::posix_time::millisec(200)))
+ return 0;
+
+ // If we already waited, let someone else do cleanup unless we are significantly
+ // over the limit.
+ if ((_dataSize.load() - _cappedMaxSize) < (2 * _cappedMaxSizeSlack))
+ return 0;
+ }
}
+ return cappedDeleteAsNeeded_inlock(txn, justInserted);
+ }
+
+ int64_t RocksRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn,
+ const RecordId& justInserted) {
// we do this is a sub transaction in case it aborts
RocksRecoveryUnit* realRecoveryUnit =
checked_cast<RocksRecoveryUnit*>(txn->releaseRecoveryUnit());
invariant(realRecoveryUnit);
txn->setRecoveryUnit(realRecoveryUnit->newRocksRecoveryUnit());
+ int64_t dataSize = _dataSize.load() + realRecoveryUnit->getDeltaCounter(_dataSizeKey);
+ int64_t numRecords = _numRecords.load() + realRecoveryUnit->getDeltaCounter(_numRecordsKey);
+
+ int64_t sizeOverCap = (dataSize > _cappedMaxSize) ? dataSize - _cappedMaxSize : 0;
+ int64_t sizeSaved = 0;
+ int64_t docsOverCap = 0, docsRemoved = 0;
+ if (_cappedMaxDocs != -1 && numRecords > _cappedMaxDocs) {
+ docsOverCap = numRecords - _cappedMaxDocs;
+ }
+
try {
+ WriteUnitOfWork wuow(txn);
auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix));
iter->SeekToFirst();
- while (cappedAndNeedDelete(dataSizeDelta, numRecordsDelta) && iter->Valid()) {
- WriteUnitOfWork wuow(txn);
-
- invariant(_numRecords.load() > 0);
+ RecordId newestOld;
+ while ((sizeSaved < sizeOverCap || docsRemoved < docsOverCap) &&
+ (docsRemoved < 20000) && iter->Valid()) {
rocksdb::Slice slice = iter->key();
- RecordId oldest = _makeRecordId(slice);
+ newestOld = _makeRecordId(slice);
- if (oldest >= justInserted) {
+ // don't go past the record we just inserted
+ if (newestOld >= justInserted) {
break;
}
- if (_cappedDeleteCallback) {
- uassertStatusOK(
- _cappedDeleteCallback->aboutToDeleteCapped(
- txn,
- oldest,
- RecordData(iter->value().data(), iter->value().size())));
+ if (_shuttingDown) {
+ break;
+ }
+
+ std::string key(_makePrefixedKey(_prefix, newestOld));
+ if (!ru->transaction()->registerWrite(key)) {
+ log() << "got conflict truncating capped, total docs removed " << docsRemoved;
+ break;
}
- deleteRecord(txn, oldest);
+ auto oldValue = iter->value();
+ ++docsRemoved;
+ sizeSaved += oldValue.size();
+
+ if (_cappedDeleteCallback) {
+ uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(
+ txn, newestOld,
+ RecordData(static_cast<const char*>(oldValue.data()), oldValue.size())));
+ }
+ ru->writeBatch()->Delete(key);
iter->Next();
+ }
- // We need to commit here to reflect updates on _numRecords and _dataSize.
- // TODO: investigate if we should reflect changes to _numRecords and _dataSize
- // immediately (read uncommitted).
+ if (docsRemoved > 0) {
+ _changeNumRecords(txn, -docsRemoved);
+ _increaseDataSize(txn, -sizeSaved);
wuow.commit();
}
}
- catch (const WriteConflictException& wce) {
+ catch ( const WriteConflictException& wce ) {
delete txn->releaseRecoveryUnit();
- txn->setRecoveryUnit(realRecoveryUnit);
+ txn->setRecoveryUnit( realRecoveryUnit );
log() << "got conflict truncating capped, ignoring";
- return;
+ return 0;
}
- catch (...) {
+ catch ( ... ) {
delete txn->releaseRecoveryUnit();
- txn->setRecoveryUnit(realRecoveryUnit);
+ txn->setRecoveryUnit( realRecoveryUnit );
throw;
}
delete txn->releaseRecoveryUnit();
- txn->setRecoveryUnit(realRecoveryUnit);
+ txn->setRecoveryUnit( realRecoveryUnit );
+ return docsRemoved;
+
}
StatusWith<RecordId> RocksRecordStore::insertRecord( OperationContext* txn,
@@ -355,7 +425,7 @@ namespace mongo {
// transaction can access this key before we commit
ru->writeBatch()->Put(_makePrefixedKey(_prefix, loc), rocksdb::Slice(data, len));
- _changeNumRecords( txn, true );
+ _changeNumRecords( txn, 1 );
_increaseDataSize( txn, len );
cappedDeleteAsNeeded(txn, loc);
@@ -659,17 +729,12 @@ namespace mongo {
return RecordData(data.moveFrom(), valueStorage.size());
}
- void RocksRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) {
- RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
- if ( insert ) {
- ru->incrementCounter(_numRecordsKey, &_numRecords, 1);
- }
- else {
- ru->incrementCounter(_numRecordsKey, &_numRecords, -1);
- }
+ void RocksRecordStore::_changeNumRecords(OperationContext* txn, int64_t amount) {
+ RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn);
+ ru->incrementCounter(_numRecordsKey, &_numRecords, amount);
}
- void RocksRecordStore::_increaseDataSize( OperationContext* txn, int amount ) {
+ void RocksRecordStore::_increaseDataSize(OperationContext* txn, int64_t amount) {
RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn );
ru->incrementCounter(_dataSizeKey, &_dataSize, amount);
}
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h
index fdd2763a92f..8650cff763e 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.h
+++ b/src/mongo/db/storage/rocks/rocks_record_store.h
@@ -83,7 +83,7 @@ namespace mongo {
int64_t cappedMaxDocs = -1,
CappedDocumentDeleteCallback* cappedDeleteCallback = NULL);
- virtual ~RocksRecordStore() { }
+ virtual ~RocksRecordStore();
// name of the RecordStore implementation
virtual const char* name() const { return "rocks"; }
@@ -184,6 +184,10 @@ namespace mongo {
bool cappedMaxSize() const { invariant(_isCapped); return _cappedMaxSize; }
bool isOplog() const { return _isOplog; }
+ int64_t cappedDeleteAsNeeded(OperationContext* txn, const RecordId& justInserted);
+ int64_t cappedDeleteAsNeeded_inlock(OperationContext* txn, const RecordId& justInserted);
+ boost::timed_mutex& cappedDeleterMutex() { return _cappedDeleterMutex; }
+
static rocksdb::Comparator* newRocksCollectionComparator();
private:
@@ -234,23 +238,23 @@ namespace mongo {
RecordId _nextId();
bool cappedAndNeedDelete(long long dataSizeDelta, long long numRecordsDelta) const;
- void cappedDeleteAsNeeded(OperationContext* txn, const RecordId& justInserted);
// The use of this function requires that the passed in storage outlives the returned Slice
static rocksdb::Slice _makeKey(const RecordId& loc, int64_t* storage);
static std::string _makePrefixedKey(const std::string& prefix, const RecordId& loc);
- void _changeNumRecords(OperationContext* txn, bool insert);
- void _increaseDataSize(OperationContext* txn, int amount);
+ void _changeNumRecords(OperationContext* txn, int64_t amount);
+ void _increaseDataSize(OperationContext* txn, int64_t amount);
rocksdb::DB* _db; // not owned
std::string _prefix;
const bool _isCapped;
const int64_t _cappedMaxSize;
+ const int64_t _cappedMaxSizeSlack; // when to start applying backpressure
const int64_t _cappedMaxDocs;
CappedDocumentDeleteCallback* _cappedDeleteCallback;
- boost::mutex _cappedDeleterMutex; // see commend in ::cappedDeleteAsNeeded
+ mutable boost::timed_mutex _cappedDeleterMutex; // see comment in ::cappedDeleteAsNeeded
int _cappedDeleteCheckCount; // see comment in ::cappedDeleteAsNeeded
const bool _isOplog;
@@ -265,5 +269,8 @@ namespace mongo {
const std::string _dataSizeKey;
const std::string _numRecordsKey;
+
+ bool _shuttingDown;
+ bool _hasBackgroundThread;
};
}
diff --git a/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp b/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp
new file mode 100644
index 00000000000..d60bf6cb738
--- /dev/null
+++ b/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/rocks/rocks_engine.h"
+
+namespace mongo {
+
+ // static
+ bool RocksEngine::initRsOplogBackgroundThread(StringData ns) {
+ return false;
+ }
+
+} // namespace mongo
diff --git a/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp b/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp
new file mode 100644
index 00000000000..91d86698705
--- /dev/null
+++ b/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp
@@ -0,0 +1,172 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include <boost/thread/mutex.hpp>
+#include <set>
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/db/client.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/global_environment_experiment.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context_impl.h"
+#include "mongo/db/storage/rocks/rocks_engine.h"
+#include "mongo/db/storage/rocks/rocks_record_store.h"
+#include "mongo/db/storage/rocks/rocks_recovery_unit.h"
+#include "mongo/util/background.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+ namespace {
+
+ std::set<NamespaceString> _backgroundThreadNamespaces;
+ boost::mutex _backgroundThreadMutex;
+
+ class RocksRecordStoreThread : public BackgroundJob {
+ public:
+ RocksRecordStoreThread(const NamespaceString& ns)
+ : _ns(ns) {
+ _name = std::string("RocksRecordStoreThread for ") + _ns.toString();
+ }
+
+ virtual std::string name() const {
+ return _name;
+ }
+
+ /**
+ * @return Number of documents deleted.
+ */
+ int64_t _deleteExcessDocuments() {
+ if (!getGlobalEnvironment()->getGlobalStorageEngine()) {
+ LOG(1) << "no global storage engine yet";
+ return 0;
+ }
+
+ OperationContextImpl txn;
+
+ try {
+ ScopedTransaction transaction(&txn, MODE_IX);
+
+ AutoGetDb autoDb(&txn, _ns.db(), MODE_IX);
+ Database* db = autoDb.getDb();
+ if (!db) {
+ LOG(2) << "no local database yet";
+ return 0;
+ }
+
+ Lock::CollectionLock collectionLock(txn.lockState(), _ns.ns(), MODE_IX);
+ Collection* collection = db->getCollection(_ns);
+ if (!collection) {
+ LOG(2) << "no collection " << _ns;
+ return 0;
+ }
+
+ Client::Context ctx(&txn, _ns, false);
+ RocksRecordStore* rs =
+ checked_cast<RocksRecordStore*>(collection->getRecordStore());
+ WriteUnitOfWork wuow(&txn);
+ boost::timed_mutex::scoped_lock lock(rs->cappedDeleterMutex());
+ int64_t removed = rs->cappedDeleteAsNeeded_inlock(&txn, RecordId::max());
+ wuow.commit();
+ return removed;
+ }
+ catch (const std::exception& e) {
+ severe() << "error in RocksRecordStoreThread: " << e.what();
+ fassertFailedNoTrace(!"error in RocksRecordStoreThread");
+ }
+ catch (...) {
+ fassertFailedNoTrace(!"unknown error in RocksRecordStoreThread");
+ }
+ }
+
+ virtual void run() {
+ Client::initThread(_name.c_str());
+
+ while (!inShutdown()) {
+ int64_t removed = _deleteExcessDocuments();
+ LOG(2) << "RocksRecordStoreThread deleted " << removed;
+ if (removed == 0) {
+ // If we removed 0 documents, sleep a bit in case we're on a laptop
+ // or something to be nice.
+ sleepmillis(1000);
+ }
+ else if(removed < 1000) {
+ // 1000 is the batch size, so we didn't even do a full batch,
+ // which is the most efficient.
+ sleepmillis(10);
+ }
+ }
+
+ cc().shutdown();
+
+ log() << "shutting down";
+ }
+
+ private:
+ NamespaceString _ns;
+ std::string _name;
+ };
+
+ } // namespace
+
+ // static
+ bool RocksEngine::initRsOplogBackgroundThread(StringData ns) {
+ if (!NamespaceString::oplog(ns)) {
+ return false;
+ }
+
+ if (storageGlobalParams.repair) {
+ LOG(1) << "not starting RocksRecordStoreThread for " << ns
+ << " because we are in repair";
+ return false;
+ }
+
+ boost::mutex::scoped_lock lock(_backgroundThreadMutex);
+ NamespaceString nss(ns);
+ if (_backgroundThreadNamespaces.count(nss)) {
+ log() << "RocksRecordStoreThread " << ns << " already started";
+ }
+ else {
+ log() << "Starting RocksRecordStoreThread " << ns;
+ BackgroundJob* backgroundThread = new RocksRecordStoreThread(nss);
+ backgroundThread->go();
+ _backgroundThreadNamespaces.insert(nss);
+ }
+ return true;
+ }
+
+} // namespace mongo