diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/storage/rocks/SConscript | 19 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_engine.h | 8 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_record_store.cpp | 155 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_record_store.h | 17 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_record_store_mock.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp | 172 |
6 files changed, 358 insertions, 54 deletions
diff --git a/src/mongo/db/storage/rocks/SConscript b/src/mongo/db/storage/rocks/SConscript index c171bd238d2..ec5f6fac153 100644 --- a/src/mongo/db/storage/rocks/SConscript +++ b/src/mongo/db/storage/rocks/SConscript @@ -33,7 +33,8 @@ if has_option("rocksdb"): env.Library( target= 'storage_rocks', source= [ - 'rocks_init.cpp' + 'rocks_init.cpp', + 'rocks_record_store_mongod.cpp', ], LIBDEPS= [ 'storage_rocks_base', @@ -41,13 +42,23 @@ if has_option("rocksdb"): ] ) + env.Library( + target= 'storage_rocks_mock', + source= [ + 'rocks_record_store_mock.cpp', + ], + LIBDEPS= [ + 'storage_rocks_base', + ] + ) + env.CppUnitTest( target='storage_rocks_index_test', source=['rocks_index_test.cpp' ], LIBDEPS=[ - 'storage_rocks_base', + 'storage_rocks_mock', '$BUILD_DIR/mongo/db/storage/sorted_data_interface_test_harness' ] ) @@ -58,7 +69,7 @@ if has_option("rocksdb"): source=['rocks_record_store_test.cpp' ], LIBDEPS=[ - 'storage_rocks_base', + 'storage_rocks_mock', '$BUILD_DIR/mongo/db/storage/record_store_test_harness' ] ) @@ -68,7 +79,7 @@ if has_option("rocksdb"): source=['rocks_engine_test.cpp' ], LIBDEPS=[ - 'storage_rocks_base', + 'storage_rocks_mock', '$BUILD_DIR/mongo/db/storage/kv/kv_engine_test_harness' ] ) diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h index 4078c13414e..2d9a55a848c 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.h +++ b/src/mongo/db/storage/rocks/rocks_engine.h @@ -116,6 +116,14 @@ namespace mongo { virtual void cleanShutdown() {} + /** + * Initializes a background job to remove excess documents in the oplog collections. + * This applies to the capped collections in the local.oplog.* namespaces (specifically + * local.oplog.rs for replica sets and local.oplog.$main for master/slave replication). + * Returns true if a background job is running for the namespace. + */ + static bool initRsOplogBackgroundThread(StringData ns); + // rocks specific api rocksdb::DB* getDB() { return _db.get(); } diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp index 0fc42f6d484..e1b7b5a10fc 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp @@ -49,9 +49,11 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/storage/rocks/rocks_engine.h" #include "mongo/db/storage/rocks/rocks_recovery_unit.h" #include "mongo/db/storage/oplog_hack.h" #include "mongo/platform/endian.h" +#include "mongo/util/background.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -145,6 +147,7 @@ namespace mongo { _prefix(std::move(prefix)), _isCapped(isCapped), _cappedMaxSize(cappedMaxSize), + _cappedMaxSizeSlack(std::min(cappedMaxSize / 10, int64_t(16 * 1024 * 1024))), _cappedMaxDocs(cappedMaxDocs), _cappedDeleteCallback(cappedDeleteCallback), _cappedDeleteCheckCount(0), @@ -154,7 +157,8 @@ namespace mongo { : nullptr), _ident(id.toString()), _dataSizeKey(std::string("\0\0\0\0", 4) + "datasize-" + id.toString()), - _numRecordsKey(std::string("\0\0\0\0", 4) + "numrecords-" + id.toString()) { + _numRecordsKey(std::string("\0\0\0\0", 4) + "numrecords-" + id.toString()), + _shuttingDown(false) { if (_isCapped) { invariant(_cappedMaxSize > 0); @@ -186,6 +190,15 @@ namespace mongo { _dataSize.store(RocksRecoveryUnit::getCounterValue(_db, _dataSizeKey)); invariant(_dataSize.load() >= 0); invariant(_numRecords.load() >= 0); + + _hasBackgroundThread = RocksEngine::initRsOplogBackgroundThread(ns); + } + + RocksRecordStore::~RocksRecordStore() { + { + boost::timed_mutex::scoped_lock lk(_cappedDeleterMutex); + _shuttingDown = true; + } } int64_t RocksRecordStore::storageSize(OperationContext* txn, BSONObjBuilder* extraInfo, @@ -214,7 +227,7 @@ namespace mongo { ru->writeBatch()->Delete(key); - _changeNumRecords(txn, false); + _changeNumRecords(txn, -1); _increaseDataSize(txn, -oldLength); } @@ -237,18 +250,15 @@ namespace mongo { return false; } - void RocksRecordStore::cappedDeleteAsNeeded(OperationContext* txn, - const RecordId& justInserted) { + int64_t RocksRecordStore::cappedDeleteAsNeeded(OperationContext* txn, + const RecordId& justInserted) { if (!_isCapped) { - return; + return 0; } // We only want to do the checks occasionally as they are expensive. // This variable isn't thread safe, but has loose semantics anyway. - dassert( !_isOplog || _cappedMaxDocs == -1 ); - if ( _cappedMaxDocs == -1 && // Max docs has to be exact, so have to check every time. - _cappedDeleteCheckCount++ % 100 > 0 ) - return; + dassert(!_isOplog || _cappedMaxDocs == -1); long long dataSizeDelta = 0, numRecordsDelta = 0; if (!_isOplog) { @@ -258,70 +268,130 @@ namespace mongo { } if (!cappedAndNeedDelete(dataSizeDelta, numRecordsDelta)) { - return; + return 0; } // ensure only one thread at a time can do deletes, otherwise they'll conflict. - boost::mutex::scoped_lock lock(_cappedDeleterMutex, boost::try_to_lock); - if (!lock) { - return; + boost::timed_mutex::scoped_lock lock(_cappedDeleterMutex, boost::defer_lock); + + if (_cappedMaxDocs != -1) { + lock.lock(); // Max docs has to be exact, so have to check every time. + } + else if(_hasBackgroundThread) { + // We are foreground, and there is a background thread, + + // Check if we need some back pressure. + if ((_dataSize.load() - _cappedMaxSize) < _cappedMaxSizeSlack) { + return 0; + } + + // Back pressure needed! + // We're not actually going to delete anything, but we're going to syncronize + // on the deleter thread. + (void)lock.timed_lock(boost::posix_time::millisec(200)); + return 0; + } else { + if (!lock.try_lock()) { + // Someone else is deleting old records. Apply back-pressure if too far behind, + // otherwise continue. + if ((_dataSize.load() - _cappedMaxSize) < _cappedMaxSizeSlack) + return 0; + + if (!lock.timed_lock(boost::posix_time::millisec(200))) + return 0; + + // If we already waited, let someone else do cleanup unless we are significantly + // over the limit. + if ((_dataSize.load() - _cappedMaxSize) < (2 * _cappedMaxSizeSlack)) + return 0; + } } + return cappedDeleteAsNeeded_inlock(txn, justInserted); + } + + int64_t RocksRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn, + const RecordId& justInserted) { // we do this is a sub transaction in case it aborts RocksRecoveryUnit* realRecoveryUnit = checked_cast<RocksRecoveryUnit*>(txn->releaseRecoveryUnit()); invariant(realRecoveryUnit); txn->setRecoveryUnit(realRecoveryUnit->newRocksRecoveryUnit()); + int64_t dataSize = _dataSize.load() + realRecoveryUnit->getDeltaCounter(_dataSizeKey); + int64_t numRecords = _numRecords.load() + realRecoveryUnit->getDeltaCounter(_numRecordsKey); + + int64_t sizeOverCap = (dataSize > _cappedMaxSize) ? dataSize - _cappedMaxSize : 0; + int64_t sizeSaved = 0; + int64_t docsOverCap = 0, docsRemoved = 0; + if (_cappedMaxDocs != -1 && numRecords > _cappedMaxDocs) { + docsOverCap = numRecords - _cappedMaxDocs; + } + try { + WriteUnitOfWork wuow(txn); auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); boost::scoped_ptr<rocksdb::Iterator> iter(ru->NewIterator(_prefix)); iter->SeekToFirst(); - while (cappedAndNeedDelete(dataSizeDelta, numRecordsDelta) && iter->Valid()) { - WriteUnitOfWork wuow(txn); - - invariant(_numRecords.load() > 0); + RecordId newestOld; + while ((sizeSaved < sizeOverCap || docsRemoved < docsOverCap) && + (docsRemoved < 20000) && iter->Valid()) { rocksdb::Slice slice = iter->key(); - RecordId oldest = _makeRecordId(slice); + newestOld = _makeRecordId(slice); - if (oldest >= justInserted) { + // don't go past the record we just inserted + if (newestOld >= justInserted) { break; } - if (_cappedDeleteCallback) { - uassertStatusOK( - _cappedDeleteCallback->aboutToDeleteCapped( - txn, - oldest, - RecordData(iter->value().data(), iter->value().size()))); + if (_shuttingDown) { + break; + } + + std::string key(_makePrefixedKey(_prefix, newestOld)); + if (!ru->transaction()->registerWrite(key)) { + log() << "got conflict truncating capped, total docs removed " << docsRemoved; + break; } - deleteRecord(txn, oldest); + auto oldValue = iter->value(); + ++docsRemoved; + sizeSaved += oldValue.size(); + + if (_cappedDeleteCallback) { + uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped( + txn, newestOld, + RecordData(static_cast<const char*>(oldValue.data()), oldValue.size()))); + } + ru->writeBatch()->Delete(key); iter->Next(); + } - // We need to commit here to reflect updates on _numRecords and _dataSize. - // TODO: investigate if we should reflect changes to _numRecords and _dataSize - // immediately (read uncommitted). + if (docsRemoved > 0) { + _changeNumRecords(txn, -docsRemoved); + _increaseDataSize(txn, -sizeSaved); wuow.commit(); } } - catch (const WriteConflictException& wce) { + catch ( const WriteConflictException& wce ) { delete txn->releaseRecoveryUnit(); - txn->setRecoveryUnit(realRecoveryUnit); + txn->setRecoveryUnit( realRecoveryUnit ); log() << "got conflict truncating capped, ignoring"; - return; + return 0; } - catch (...) { + catch ( ... ) { delete txn->releaseRecoveryUnit(); - txn->setRecoveryUnit(realRecoveryUnit); + txn->setRecoveryUnit( realRecoveryUnit ); throw; } delete txn->releaseRecoveryUnit(); - txn->setRecoveryUnit(realRecoveryUnit); + txn->setRecoveryUnit( realRecoveryUnit ); + return docsRemoved; + } StatusWith<RecordId> RocksRecordStore::insertRecord( OperationContext* txn, @@ -355,7 +425,7 @@ namespace mongo { // transaction can access this key before we commit ru->writeBatch()->Put(_makePrefixedKey(_prefix, loc), rocksdb::Slice(data, len)); - _changeNumRecords( txn, true ); + _changeNumRecords( txn, 1 ); _increaseDataSize( txn, len ); cappedDeleteAsNeeded(txn, loc); @@ -659,17 +729,12 @@ namespace mongo { return RecordData(data.moveFrom(), valueStorage.size()); } - void RocksRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) { - RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); - if ( insert ) { - ru->incrementCounter(_numRecordsKey, &_numRecords, 1); - } - else { - ru->incrementCounter(_numRecordsKey, &_numRecords, -1); - } + void RocksRecordStore::_changeNumRecords(OperationContext* txn, int64_t amount) { + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + ru->incrementCounter(_numRecordsKey, &_numRecords, amount); } - void RocksRecordStore::_increaseDataSize( OperationContext* txn, int amount ) { + void RocksRecordStore::_increaseDataSize(OperationContext* txn, int64_t amount) { RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); ru->incrementCounter(_dataSizeKey, &_dataSize, amount); } diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h index fdd2763a92f..8650cff763e 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.h +++ b/src/mongo/db/storage/rocks/rocks_record_store.h @@ -83,7 +83,7 @@ namespace mongo { int64_t cappedMaxDocs = -1, CappedDocumentDeleteCallback* cappedDeleteCallback = NULL); - virtual ~RocksRecordStore() { } + virtual ~RocksRecordStore(); // name of the RecordStore implementation virtual const char* name() const { return "rocks"; } @@ -184,6 +184,10 @@ namespace mongo { bool cappedMaxSize() const { invariant(_isCapped); return _cappedMaxSize; } bool isOplog() const { return _isOplog; } + int64_t cappedDeleteAsNeeded(OperationContext* txn, const RecordId& justInserted); + int64_t cappedDeleteAsNeeded_inlock(OperationContext* txn, const RecordId& justInserted); + boost::timed_mutex& cappedDeleterMutex() { return _cappedDeleterMutex; } + static rocksdb::Comparator* newRocksCollectionComparator(); private: @@ -234,23 +238,23 @@ namespace mongo { RecordId _nextId(); bool cappedAndNeedDelete(long long dataSizeDelta, long long numRecordsDelta) const; - void cappedDeleteAsNeeded(OperationContext* txn, const RecordId& justInserted); // The use of this function requires that the passed in storage outlives the returned Slice static rocksdb::Slice _makeKey(const RecordId& loc, int64_t* storage); static std::string _makePrefixedKey(const std::string& prefix, const RecordId& loc); - void _changeNumRecords(OperationContext* txn, bool insert); - void _increaseDataSize(OperationContext* txn, int amount); + void _changeNumRecords(OperationContext* txn, int64_t amount); + void _increaseDataSize(OperationContext* txn, int64_t amount); rocksdb::DB* _db; // not owned std::string _prefix; const bool _isCapped; const int64_t _cappedMaxSize; + const int64_t _cappedMaxSizeSlack; // when to start applying backpressure const int64_t _cappedMaxDocs; CappedDocumentDeleteCallback* _cappedDeleteCallback; - boost::mutex _cappedDeleterMutex; // see commend in ::cappedDeleteAsNeeded + mutable boost::timed_mutex _cappedDeleterMutex; // see comment in ::cappedDeleteAsNeeded int _cappedDeleteCheckCount; // see comment in ::cappedDeleteAsNeeded const bool _isOplog; @@ -265,5 +269,8 @@ namespace mongo { const std::string _dataSizeKey; const std::string _numRecordsKey; + + bool _shuttingDown; + bool _hasBackgroundThread; }; } diff --git a/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp b/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp new file mode 100644 index 00000000000..d60bf6cb738 --- /dev/null +++ b/src/mongo/db/storage/rocks/rocks_record_store_mock.cpp @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/storage/rocks/rocks_engine.h" + +namespace mongo { + + // static + bool RocksEngine::initRsOplogBackgroundThread(StringData ns) { + return false; + } + +} // namespace mongo diff --git a/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp b/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp new file mode 100644 index 00000000000..91d86698705 --- /dev/null +++ b/src/mongo/db/storage/rocks/rocks_record_store_mongod.cpp @@ -0,0 +1,172 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage + +#include "mongo/platform/basic.h" + +#include <boost/thread/mutex.hpp> +#include <set> + +#include "mongo/base/checked_cast.h" +#include "mongo/db/client.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/global_environment_experiment.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context_impl.h" +#include "mongo/db/storage/rocks/rocks_engine.h" +#include "mongo/db/storage/rocks/rocks_record_store.h" +#include "mongo/db/storage/rocks/rocks_recovery_unit.h" +#include "mongo/util/background.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" + +namespace mongo { + + namespace { + + std::set<NamespaceString> _backgroundThreadNamespaces; + boost::mutex _backgroundThreadMutex; + + class RocksRecordStoreThread : public BackgroundJob { + public: + RocksRecordStoreThread(const NamespaceString& ns) + : _ns(ns) { + _name = std::string("RocksRecordStoreThread for ") + _ns.toString(); + } + + virtual std::string name() const { + return _name; + } + + /** + * @return Number of documents deleted. + */ + int64_t _deleteExcessDocuments() { + if (!getGlobalEnvironment()->getGlobalStorageEngine()) { + LOG(1) << "no global storage engine yet"; + return 0; + } + + OperationContextImpl txn; + + try { + ScopedTransaction transaction(&txn, MODE_IX); + + AutoGetDb autoDb(&txn, _ns.db(), MODE_IX); + Database* db = autoDb.getDb(); + if (!db) { + LOG(2) << "no local database yet"; + return 0; + } + + Lock::CollectionLock collectionLock(txn.lockState(), _ns.ns(), MODE_IX); + Collection* collection = db->getCollection(_ns); + if (!collection) { + LOG(2) << "no collection " << _ns; + return 0; + } + + Client::Context ctx(&txn, _ns, false); + RocksRecordStore* rs = + checked_cast<RocksRecordStore*>(collection->getRecordStore()); + WriteUnitOfWork wuow(&txn); + boost::timed_mutex::scoped_lock lock(rs->cappedDeleterMutex()); + int64_t removed = rs->cappedDeleteAsNeeded_inlock(&txn, RecordId::max()); + wuow.commit(); + return removed; + } + catch (const std::exception& e) { + severe() << "error in RocksRecordStoreThread: " << e.what(); + fassertFailedNoTrace(!"error in RocksRecordStoreThread"); + } + catch (...) { + fassertFailedNoTrace(!"unknown error in RocksRecordStoreThread"); + } + } + + virtual void run() { + Client::initThread(_name.c_str()); + + while (!inShutdown()) { + int64_t removed = _deleteExcessDocuments(); + LOG(2) << "RocksRecordStoreThread deleted " << removed; + if (removed == 0) { + // If we removed 0 documents, sleep a bit in case we're on a laptop + // or something to be nice. + sleepmillis(1000); + } + else if(removed < 1000) { + // 1000 is the batch size, so we didn't even do a full batch, + // which is the most efficient. + sleepmillis(10); + } + } + + cc().shutdown(); + + log() << "shutting down"; + } + + private: + NamespaceString _ns; + std::string _name; + }; + + } // namespace + + // static + bool RocksEngine::initRsOplogBackgroundThread(StringData ns) { + if (!NamespaceString::oplog(ns)) { + return false; + } + + if (storageGlobalParams.repair) { + LOG(1) << "not starting RocksRecordStoreThread for " << ns + << " because we are in repair"; + return false; + } + + boost::mutex::scoped_lock lock(_backgroundThreadMutex); + NamespaceString nss(ns); + if (_backgroundThreadNamespaces.count(nss)) { + log() << "RocksRecordStoreThread " << ns << " already started"; + } + else { + log() << "Starting RocksRecordStoreThread " << ns; + BackgroundJob* backgroundThread = new RocksRecordStoreThread(nss); + backgroundThread->go(); + _backgroundThreadNamespaces.insert(nss); + } + return true; + } + +} // namespace mongo |