summaryrefslogtreecommitdiff
path: root/src/mongo/db/ttl.cpp
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2014-10-13 11:26:11 -0400
committerMax Hirschhorn <max.hirschhorn@mongodb.com>2014-10-13 22:16:18 -0400
commit1a44dc6a150524de75dd46c8dc464c7182a37389 (patch)
treecc570f70678f39fc0be4f3ee9b51cf6c1fb5e9f4 /src/mongo/db/ttl.cpp
parentaa31cae71bc5390fc02d48fc793eebb1991d0ff3 (diff)
downloadmongo-1a44dc6a150524de75dd46c8dc464c7182a37389.tar.gz
SERVER-15570 Avoid querying system.indexes in ttl.cpp.
Changed so that the TTL indexes are processed in multiple phases: - Acquire an IS-mode lock on the database and find all of the TTL indexes For each TTL index, - Acquire an IX-mode lock on the collection and delete all of the expired documents Also avoid checking `userFlags` of collection stats in TTL index tests since it is not set by some storage engines.
Diffstat (limited to 'src/mongo/db/ttl.cpp')
-rw-r--r--src/mongo/db/ttl.cpp220
1 files changed, 126 insertions, 94 deletions
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp
index 9d5ad64b787..861d9dee426 100644
--- a/src/mongo/db/ttl.cpp
+++ b/src/mongo/db/ttl.cpp
@@ -41,6 +41,8 @@
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/catalog/collection_catalog_entry.h"
+#include "mongo/db/catalog/database_catalog_entry.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context_impl.h"
@@ -59,97 +61,15 @@ namespace mongo {
ServerStatusMetricField<Counter64> ttlDeletedDocumentsDisplay("ttl.deletedDocuments", &ttlDeletedDocuments);
MONGO_EXPORT_SERVER_PARAMETER( ttlMonitorEnabled, bool, true );
-
+
class TTLMonitor : public BackgroundJob {
public:
TTLMonitor(){}
virtual ~TTLMonitor(){}
virtual string name() const { return "TTLMonitor"; }
-
- static string secondsExpireField;
-
- void doTTLForDB( OperationContext* txn, const string& dbName ) {
-
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName))
- return;
-
- DBDirectClient db(txn);
- vector<BSONObj> indexes;
- {
- auto_ptr<DBClientCursor> cursor =
- db.query( dbName + ".system.indexes" ,
- BSON( secondsExpireField << BSON( "$exists" << true ) ) ,
- 0 , /* default nToReturn */
- 0 , /* default nToSkip */
- 0 , /* default fieldsToReturn */
- QueryOption_SlaveOk ); /* perform on secondaries too */
- if ( cursor.get() ) {
- while ( cursor->more() ) {
- indexes.push_back( cursor->next().getOwned() );
- }
- }
- }
-
- for ( unsigned i=0; i<indexes.size(); i++ ) {
- BSONObj idx = indexes[i];
-
-
- BSONObj key = idx["key"].Obj();
- if ( key.nFields() != 1 ) {
- error() << "key for ttl index can only have 1 field" << endl;
- continue;
- }
- if (!idx[secondsExpireField].isNumber()) {
- log() << "ttl indexes require the " << secondsExpireField << " field to be "
- << "numeric but received a type of: "
- << typeName(idx[secondsExpireField].type());
- continue;
- }
-
- BSONObj query;
- {
- BSONObjBuilder b;
- b.appendDate( "$lt" , curTimeMillis64() - ( 1000 * idx[secondsExpireField].numberLong() ) );
- query = BSON( key.firstElement().fieldName() << b.obj() );
- }
-
- LOG(1) << "TTL: " << key << " \t " << query << endl;
-
- long long n = 0;
- {
- const string ns = idx["ns"].String();
- Client::WriteContext ctx(txn, ns );
- Collection* collection = ctx.getCollection();
- if ( !collection ) {
- // collection was dropped
- continue;
- }
-
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- dbName)) {
- // we've stepped down since we started this function,
- // so we should stop working as we only do deletes on the primary
- break;
- }
-
- if ( collection->getIndexCatalog()->findIndexByKeyPattern( txn, key ) == NULL ) {
- // index not finished yet
- LOG(1) << " skipping index because not finished";
- continue;
- }
-
- n = deleteObjects(txn, ctx.ctx().db(), ns, query, false, true);
- ttlDeletedDocuments.increment( n );
- ctx.commit();
- }
-
- LOG(1) << "\tTTL deleted: " << n << endl;
- }
-
-
- }
+ static string secondsExpireField;
virtual void run() {
Client::initThread( name().c_str() );
@@ -157,18 +77,16 @@ namespace mongo {
while ( ! inShutdown() ) {
sleepsecs( 60 );
-
+
LOG(3) << "TTLMonitor thread awake" << endl;
if ( !ttlMonitorEnabled ) {
LOG(1) << "TTLMonitor is disabled" << endl;
continue;
}
-
- OperationContextImpl txn;
if ( lockedForWriting() ) {
- // note: this is not perfect as you can go into fsync+lock between
+ // note: this is not perfect as you can go into fsync+lock between
// this and actually doing the delete later
LOG(3) << " locked for writing" << endl;
continue;
@@ -187,22 +105,136 @@ namespace mongo {
for ( set<string>::const_iterator i=dbs.begin(); i!=dbs.end(); ++i ) {
string db = *i;
- try {
- doTTLForDB( &txn, db );
+
+ vector<BSONObj> indexes;
+ {
+ OperationContextImpl txn;
+ getTTLIndexesForDB( &txn, db, &indexes );
+ }
+
+ for ( vector<BSONObj>::const_iterator it = indexes.begin();
+ it != indexes.end(); ++it ) {
+
+ OperationContextImpl txn;
+ if ( !doTTLForIndex( &txn, db, *it ) ) {
+ break; // stop processing TTL indexes on this database
+ }
}
- catch ( DBException& e ) {
- error() << "error processing ttl for db: " << db << " " << e << endl;
+ }
+ }
+ }
+
+ private:
+ /**
+ * Acquire an IS-mode lock on the specified database and for each
+ * collection in the database, append the specification of all
+ * TTL indexes on those collections to the supplied vector.
+ *
+ * The index specifications are grouped by the collection to which
+ * they belong.
+ */
+ void getTTLIndexesForDB( OperationContext* txn, const string& dbName,
+ vector<BSONObj>* indexes ) {
+
+ invariant( indexes && indexes->empty() );
+ Lock::DBLock dbLock( txn->lockState(), dbName, MODE_IS );
+
+ Database* db = dbHolder().get( txn, dbName );
+ if ( !db ) {
+ return; // skip since database no longer exists
+ }
+
+ const DatabaseCatalogEntry* dbEntry = db->getDatabaseCatalogEntry();
+
+ list<string> namespaces;
+ dbEntry->getCollectionNamespaces( &namespaces );
+
+ for ( list<string>::const_iterator it = namespaces.begin();
+ it != namespaces.end(); ++it ) {
+
+ CollectionCatalogEntry* coll = dbEntry->getCollectionCatalogEntry( txn, *it );
+
+ vector<string> indexNames;
+ coll->getAllIndexes( txn, &indexNames );
+ for ( size_t i = 0; i < indexNames.size(); i++ ) {
+ const string& name = indexNames[i];
+ BSONObj spec = coll->getIndexSpec( txn, name );
+
+ if ( spec.hasField( secondsExpireField ) ) {
+ indexes->push_back( spec.getOwned() );
}
}
+ }
+ }
+ /**
+ * Remove documents from the collection using the specified TTL index
+ * after a sufficient amount of time has passed according to its expiry
+ * specification.
+ *
+ * @return true if caller should continue processing TTL indexes of collections
+ * on the specified database, and false otherwise
+ */
+ bool doTTLForIndex( OperationContext* txn, const string& dbName, const BSONObj& idx ) {
+ BSONObj key = idx["key"].Obj();
+ if ( key.nFields() != 1 ) {
+ error() << "key for ttl index can only have 1 field" << endl;
+ return true;
}
+ if ( !idx[secondsExpireField].isNumber() ) {
+ log() << "ttl indexes require the " << secondsExpireField << " field to be "
+ << "numeric but received a type of: "
+ << typeName( idx[secondsExpireField].type() );
+ return true;
+ }
+
+ BSONObj query;
+ {
+ BSONObjBuilder b;
+ long long expireMs = 1000 * idx[secondsExpireField].numberLong();
+ b.appendDate( "$lt", curTimeMillis64() - expireMs );
+ query = BSON( key.firstElement().fieldName() << b.obj() );
+ }
+
+ LOG(1) << "TTL: " << key << " \t " << query << endl;
+
+ long long n = 0;
+ {
+ const string ns = idx["ns"].String();
+
+ Client::WriteContext ctx( txn, ns );
+ Collection* collection = ctx.getCollection();
+ if ( !collection ) {
+ // collection was dropped
+ return true;
+ }
+
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName)) {
+ // we've stepped down since we started this function,
+ // so we should stop working as we only do deletes on the primary
+ return false;
+ }
+
+ if ( collection->getIndexCatalog()->findIndexByKeyPattern( txn, key ) == NULL ) {
+ // index not finished yet
+ LOG(1) << " skipping index because not finished";
+ return true;
+ }
+
+ n = deleteObjects( txn, ctx.ctx().db(), ns, query, false, true );
+ ttlDeletedDocuments.increment( n );
+ ctx.commit();
+ }
+
+ LOG(1) << "\tTTL deleted: " << n << endl;
+ return true;
}
};
void startTTLBackgroundJob() {
TTLMonitor* ttl = new TTLMonitor();
ttl->go();
- }
-
+ }
+
string TTLMonitor::secondsExpireField = "expireAfterSeconds";
}