summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/collection.cpp7
-rw-r--r--src/mongo/db/catalog/collection.h2
-rw-r--r--src/mongo/db/storage/capped_callback.h7
-rw-r--r--src/mongo/db/storage/in_memory/in_memory_record_store.cpp8
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp4
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp2
-rw-r--r--src/mongo/db/storage/rocks/rocks_record_store.cpp6
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp65
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h1
9 files changed, 69 insertions, 33 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 43178b727db..1fceaad5300 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -270,13 +270,14 @@ namespace mongo {
return loc;
}
- Status Collection::aboutToDeleteCapped( OperationContext* txn, const RecordId& loc ) {
-
- BSONObj doc = docFor( txn, loc );
+ Status Collection::aboutToDeleteCapped( OperationContext* txn,
+ const RecordId& loc,
+ RecordData data ) {
/* check if any cursors point to us. if so, advance them. */
_cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION);
+ BSONObj doc = data.releaseToBson();
_indexCatalog.unindexRecord(txn, doc, loc, false);
return Status::OK();
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index c6fe8b26a74..4d99eff98fc 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -293,7 +293,7 @@ namespace mongo {
Status recordStoreGoingToUpdateInPlace( OperationContext* txn,
const RecordId& loc );
- Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc );
+ Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc, RecordData data );
/**
* same semantics as insertDocument, but doesn't do:
diff --git a/src/mongo/db/storage/capped_callback.h b/src/mongo/db/storage/capped_callback.h
index a86c9e9d2dc..0ee4511f66a 100644
--- a/src/mongo/db/storage/capped_callback.h
+++ b/src/mongo/db/storage/capped_callback.h
@@ -35,6 +35,7 @@
namespace mongo {
class OperationContext;
+ class RecordData;
/**
* When a capped collection has to delete a document, it needs a way to tell the caller
@@ -47,8 +48,12 @@ namespace mongo {
/**
* This will be called right before loc is deleted when wrapping.
+ * If data is unowned, it is only valid inside of this call. If implementations wish to
+ * stash a pointer, they must copy it.
*/
- virtual Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc ) = 0;
+ virtual Status aboutToDeleteCapped( OperationContext* txn,
+ const RecordId& loc,
+ RecordData data ) = 0;
};
}
diff --git a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
index 581300a7df0..e4fb801032a 100644
--- a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
+++ b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
@@ -201,12 +201,14 @@ namespace mongo {
while (cappedAndNeedDelete(txn)) {
invariant(!_data->records.empty());
- RecordId oldest = _data->records.begin()->first;
+ Records::iterator oldest = _data->records.begin();
+ RecordId id = oldest->first;
+ RecordData data = oldest->second.toRecordData();
if (_cappedDeleteCallback)
- uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, oldest));
+ uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, id, data));
- deleteRecord(txn, oldest);
+ deleteRecord(txn, id);
}
}
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp
index 3648deff9d0..edd6304f63b 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped.cpp
@@ -172,7 +172,7 @@ namespace mongo {
}
const RecordId fr = theCapExtent()->firstRecord.toRecordId();
- Status status = _deleteCallback->aboutToDeleteCapped( txn, fr );
+ Status status = _deleteCallback->aboutToDeleteCapped( txn, fr, dataFor(txn, fr) );
if ( !status.isOK() )
return StatusWith<DiskLoc>( status );
deleteRecord( txn, fr );
@@ -487,7 +487,7 @@ namespace mongo {
WriteUnitOfWork wunit(txn);
// Delete the newest record, and coalesce the new deleted
// record with existing deleted records.
- Status status = _deleteCallback->aboutToDeleteCapped( txn, currId );
+ Status status = _deleteCallback->aboutToDeleteCapped(txn, currId, dataFor(txn, currId));
uassertStatusOK( status );
deleteRecord( txn, currId );
_compact(txn);
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp
index 951164b214a..468720793c3 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_capped_test.cpp
@@ -51,7 +51,7 @@ namespace {
class DummyCappedDocumentDeleteCallback : public CappedDocumentDeleteCallback {
public:
- Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc ) {
+ Status aboutToDeleteCapped( OperationContext* txn, const RecordId& loc, RecordData data) {
deleted.push_back( DiskLoc::fromRecordId(loc) );
return Status::OK();
}
diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp
index 6084e4d8ff0..08148e0c2ef 100644
--- a/src/mongo/db/storage/rocks/rocks_record_store.cpp
+++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp
@@ -317,7 +317,11 @@ namespace mongo {
}
if (_cappedDeleteCallback) {
- uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, oldest));
+ uassertStatusOK(
+ _cappedDeleteCallback->aboutToDeleteCapped(
+ txn,
+ oldest,
+ RecordData(iter->value().data(), iter->value().size())));
}
deleteRecord(txn, oldest);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index beca7233b71..7064094fa07 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -169,6 +169,7 @@ namespace {
_isCapped( isCapped ),
_isOplog( NamespaceString::oplog( ns ) ),
_cappedMaxSize( cappedMaxSize ),
+ _cappedMaxSizeSlack( std::min(cappedMaxSize/10, int64_t(16*1024*1024)) ),
_cappedMaxDocs( cappedMaxDocs ),
_cappedDeleteCallback( cappedDeleteCallback ),
_cappedDeleteCheckCount(0),
@@ -370,24 +371,39 @@ namespace {
// We only want to do the checks occasionally as they are expensive.
// This variable isn't thread safe, but has loose semantics anyway.
dassert( !_isOplog || _cappedMaxDocs == -1 );
- if ( _cappedMaxDocs == -1 && // Max docs has to be exact, so have to check every time.
- _cappedDeleteCheckCount++ % 100 > 0 )
- return;
if (!cappedAndNeedDelete())
return;
// ensure only one thread at a time can do deletes, otherwise they'll conflict.
- boost::mutex::scoped_lock lock(_cappedDeleterMutex, boost::try_to_lock);
- if ( !lock )
- return;
+ boost::mutex::scoped_lock lock(_cappedDeleterMutex, boost::defer_lock);
- // we do this is a sub transaction in case it aborts
+ if (_cappedMaxDocs != -1) {
+ lock.lock(); // Max docs has to be exact, so have to check every time.
+ }
+ else {
+ if (!lock.try_lock()) {
+ // Someone else is deleting old records. Apply back-pressure if too far behind,
+ // otherwise continue.
+ if ((_dataSize.load() - _cappedMaxSize) < _cappedMaxSizeSlack)
+ return;
+
+ lock.lock();
+
+ // If we already waited, let someone else do cleanup unless we are significantly
+ // over the limit.
+ if ((_dataSize.load() - _cappedMaxSize) < (2 * _cappedMaxSizeSlack))
+ return;
+ }
+ }
+
+ // we do this is a side transaction in case it aborts
WiredTigerRecoveryUnit* realRecoveryUnit =
checked_cast<WiredTigerRecoveryUnit*>( txn->releaseRecoveryUnit() );
invariant( realRecoveryUnit );
WiredTigerSessionCache* sc = realRecoveryUnit->getSessionCache();
txn->setRecoveryUnit( new WiredTigerRecoveryUnit( sc ) );
+ WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession()->getSession();
int64_t dataSize = _dataSize.load();
int64_t numRecords = _numRecords.load();
@@ -399,46 +415,53 @@ namespace {
docsOverCap = numRecords - _cappedMaxDocs;
try {
+ WriteUnitOfWork wuow(txn);
+
WiredTigerCursor curwrap( _uri, _instanceId, true, txn);
WT_CURSOR *c = curwrap.get();
- RecordId oldest;
+ RecordId newestOld;
int ret = 0;
while (( sizeSaved < sizeOverCap || docsRemoved < docsOverCap ) &&
- docsRemoved < 250 &&
+ docsRemoved < 1000 &&
(ret = c->next(c)) == 0 ) {
int64_t key;
ret = c->get_key(c, &key);
invariantWTOK(ret);
// don't go past the record we just inserted
- oldest = _fromKey(key);
- if ( oldest >= justInserted )
+ newestOld = _fromKey(key);
+ if ( newestOld >= justInserted ) // TODO: use oldest uncommitted instead
break;
- if ( _cappedDeleteCallback ) {
- uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, oldest));
- }
-
WT_ITEM old_value;
- ret = c->get_value(c, &old_value);
- invariantWTOK(ret);
+ invariantWTOK(c->get_value(c, &old_value));
++docsRemoved;
sizeSaved += old_value.size;
+
+ if ( _cappedDeleteCallback ) {
+ uassertStatusOK(
+ _cappedDeleteCallback->aboutToDeleteCapped(
+ txn,
+ newestOld,
+ RecordData(static_cast<const char*>(old_value.data), old_value.size)));
+ }
}
if (ret != WT_NOTFOUND) invariantWTOK(ret);
if (docsRemoved > 0) {
// if we scanned to the end of the collection or past our insert, go back one
- if ( ret == WT_NOTFOUND || oldest >= justInserted ) {
+ if ( ret == WT_NOTFOUND || newestOld >= justInserted ) {
ret = c->prev(c);
}
invariantWTOK(ret);
- WriteUnitOfWork wuow( txn );
- ret = c->session->truncate(c->session, NULL, NULL, c, NULL);
- invariantWTOK(ret);
+ WiredTigerCursor startWrap( _uri, _instanceId, true, txn);
+ WT_CURSOR* start = startWrap.get();
+ start->next(start);
+
+ invariantWTOK(session->truncate(session, NULL, start, c, NULL));
_changeNumRecords(txn, -docsRemoved);
_increaseDataSize(txn, -sizeSaved);
wuow.commit();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index e612bcb964f..ed5d3dffcb7 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -261,6 +261,7 @@ namespace mongo {
const bool _isCapped;
const bool _isOplog;
const int64_t _cappedMaxSize;
+ const int64_t _cappedMaxSizeSlack; // when to start applying backpressure
const int64_t _cappedMaxDocs;
CappedDocumentDeleteCallback* _cappedDeleteCallback;
int _cappedDeleteCheckCount; // see comment in ::cappedDeleteAsNeeded