diff options
Diffstat (limited to 'src')
9 files changed, 69 insertions, 33 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 43178b727db..1fceaad5300 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -270,13 +270,14 @@ namespace mongo { return loc; } - Status Collection::aboutToDeleteCapped( OperationContext* txn, const RecordId& loc ) { - - BSONObj doc = docFor( txn, loc ); + Status Collection::aboutToDeleteCapped( OperationContext* txn, + const RecordId& loc, + RecordData data ) { /* check if any cursors point to us. if so, advance them. */ _cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION); + BSONObj doc = data.releaseToBson(); _indexCatalog.unindexRecord(txn, doc, loc, false); return Status::OK(); diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index c6fe8b26a74..4d99eff98fc 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -293,7 +293,7 @@ namespace mongo { Status recordStoreGoingToUpdateInPlace( OperationContext* txn, const RecordId& loc ); - Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc ); + Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc, RecordData data ); /** * same semantics as insertDocument, but doesn't do: diff --git a/src/mongo/db/storage/capped_callback.h b/src/mongo/db/storage/capped_callback.h index a86c9e9d2dc..0ee4511f66a 100644 --- a/src/mongo/db/storage/capped_callback.h +++ b/src/mongo/db/storage/capped_callback.h @@ -35,6 +35,7 @@ namespace mongo { class OperationContext; + class RecordData; /** * When a capped collection has to delete a document, it needs a way to tell the caller @@ -47,8 +48,12 @@ namespace mongo { /** * This will be called right before loc is deleted when wrapping. + * If data is unowned, it is only valid inside of this call. If implementations wish to + * stash a pointer, they must copy it. */ - virtual Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc ) = 0; + virtual Status aboutToDeleteCapped( OperationContext* txn, + const RecordId& loc, + RecordData data ) = 0; }; } diff --git a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp index 581300a7df0..e4fb801032a 100644 --- a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp +++ b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp @@ -201,12 +201,14 @@ namespace mongo { while (cappedAndNeedDelete(txn)) { invariant(!_data->records.empty()); - RecordId oldest = _data->records.begin()->first; + Records::iterator oldest = _data->records.begin(); + RecordId id = oldest->first; + RecordData data = oldest->second.toRecordData(); if (_cappedDeleteCallback) - uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, oldest)); + uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, id, data)); - deleteRecord(txn, oldest); + deleteRecord(txn, id); } } diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp index 3648deff9d0..edd6304f63b 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp @@ -172,7 +172,7 @@ namespace mongo { } const RecordId fr = theCapExtent()->firstRecord.toRecordId(); - Status status = _deleteCallback->aboutToDeleteCapped( txn, fr ); + Status status = _deleteCallback->aboutToDeleteCapped( txn, fr, dataFor(txn, fr) ); if ( !status.isOK() ) return StatusWith<DiskLoc>( status ); deleteRecord( txn, fr ); @@ -487,7 +487,7 @@ namespace mongo { WriteUnitOfWork wunit(txn); // Delete the newest record, and coalesce the new deleted // record with existing deleted records. - Status status = _deleteCallback->aboutToDeleteCapped( txn, currId ); + Status status = _deleteCallback->aboutToDeleteCapped(txn, currId, dataFor(txn, currId)); uassertStatusOK( status ); deleteRecord( txn, currId ); _compact(txn); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp index 951164b214a..468720793c3 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp @@ -51,7 +51,7 @@ namespace { class DummyCappedDocumentDeleteCallback : public CappedDocumentDeleteCallback { public: - Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc ) { + Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc, RecordData data) { deleted.push_back( DiskLoc::fromRecordId(loc) ); return Status::OK(); } diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp index 6084e4d8ff0..08148e0c2ef 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp @@ -317,7 +317,11 @@ namespace mongo { } if (_cappedDeleteCallback) { - uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, oldest)); + uassertStatusOK( + _cappedDeleteCallback->aboutToDeleteCapped( + txn, + oldest, + RecordData(iter->value().data(), iter->value().size()))); } deleteRecord(txn, oldest); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index beca7233b71..7064094fa07 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -169,6 +169,7 @@ namespace { _isCapped( isCapped ), _isOplog( NamespaceString::oplog( ns ) ), _cappedMaxSize( cappedMaxSize ), + _cappedMaxSizeSlack( std::min(cappedMaxSize/10, int64_t(16*1024*1024)) ), _cappedMaxDocs( cappedMaxDocs ), _cappedDeleteCallback( cappedDeleteCallback ), _cappedDeleteCheckCount(0), @@ -370,24 +371,39 @@ namespace { // We only want to do the checks occasionally as they are expensive. // This variable isn't thread safe, but has loose semantics anyway. dassert( !_isOplog || _cappedMaxDocs == -1 ); - if ( _cappedMaxDocs == -1 && // Max docs has to be exact, so have to check every time. - _cappedDeleteCheckCount++ % 100 > 0 ) - return; if (!cappedAndNeedDelete()) return; // ensure only one thread at a time can do deletes, otherwise they'll conflict. - boost::mutex::scoped_lock lock(_cappedDeleterMutex, boost::try_to_lock); - if ( !lock ) - return; + boost::mutex::scoped_lock lock(_cappedDeleterMutex, boost::defer_lock); - // we do this is a sub transaction in case it aborts + if (_cappedMaxDocs != -1) { + lock.lock(); // Max docs has to be exact, so have to check every time. + } + else { + if (!lock.try_lock()) { + // Someone else is deleting old records. Apply back-pressure if too far behind, + // otherwise continue. + if ((_dataSize.load() - _cappedMaxSize) < _cappedMaxSizeSlack) + return; + + lock.lock(); + + // If we already waited, let someone else do cleanup unless we are significantly + // over the limit. + if ((_dataSize.load() - _cappedMaxSize) < (2 * _cappedMaxSizeSlack)) + return; + } + } + + // we do this is a side transaction in case it aborts WiredTigerRecoveryUnit* realRecoveryUnit = checked_cast<WiredTigerRecoveryUnit*>( txn->releaseRecoveryUnit() ); invariant( realRecoveryUnit ); WiredTigerSessionCache* sc = realRecoveryUnit->getSessionCache(); txn->setRecoveryUnit( new WiredTigerRecoveryUnit( sc ) ); + WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession()->getSession(); int64_t dataSize = _dataSize.load(); int64_t numRecords = _numRecords.load(); @@ -399,46 +415,53 @@ namespace { docsOverCap = numRecords - _cappedMaxDocs; try { + WriteUnitOfWork wuow(txn); + WiredTigerCursor curwrap( _uri, _instanceId, true, txn); WT_CURSOR *c = curwrap.get(); - RecordId oldest; + RecordId newestOld; int ret = 0; while (( sizeSaved < sizeOverCap || docsRemoved < docsOverCap ) && - docsRemoved < 250 && + docsRemoved < 1000 && (ret = c->next(c)) == 0 ) { int64_t key; ret = c->get_key(c, &key); invariantWTOK(ret); // don't go past the record we just inserted - oldest = _fromKey(key); - if ( oldest >= justInserted ) + newestOld = _fromKey(key); + if ( newestOld >= justInserted ) // TODO: use oldest uncommitted instead break; - if ( _cappedDeleteCallback ) { - uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, oldest)); - } - WT_ITEM old_value; - ret = c->get_value(c, &old_value); - invariantWTOK(ret); + invariantWTOK(c->get_value(c, &old_value)); ++docsRemoved; sizeSaved += old_value.size; + + if ( _cappedDeleteCallback ) { + uassertStatusOK( + _cappedDeleteCallback->aboutToDeleteCapped( + txn, + newestOld, + RecordData(static_cast<const char*>(old_value.data), old_value.size))); + } } if (ret != WT_NOTFOUND) invariantWTOK(ret); if (docsRemoved > 0) { // if we scanned to the end of the collection or past our insert, go back one - if ( ret == WT_NOTFOUND || oldest >= justInserted ) { + if ( ret == WT_NOTFOUND || newestOld >= justInserted ) { ret = c->prev(c); } invariantWTOK(ret); - WriteUnitOfWork wuow( txn ); - ret = c->session->truncate(c->session, NULL, NULL, c, NULL); - invariantWTOK(ret); + WiredTigerCursor startWrap( _uri, _instanceId, true, txn); + WT_CURSOR* start = startWrap.get(); + start->next(start); + + invariantWTOK(session->truncate(session, NULL, start, c, NULL)); _changeNumRecords(txn, -docsRemoved); _increaseDataSize(txn, -sizeSaved); wuow.commit(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index e612bcb964f..ed5d3dffcb7 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -261,6 +261,7 @@ namespace mongo { const bool _isCapped; const bool _isOplog; const int64_t _cappedMaxSize; + const int64_t _cappedMaxSizeSlack; // when to start applying backpressure const int64_t _cappedMaxDocs; CappedDocumentDeleteCallback* _cappedDeleteCallback; int _cappedDeleteCheckCount; // see comment in ::cappedDeleteAsNeeded |