diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2014-10-13 11:26:11 -0400 |
---|---|---|
committer | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2014-10-13 22:16:18 -0400 |
commit | 1a44dc6a150524de75dd46c8dc464c7182a37389 (patch) | |
tree | cc570f70678f39fc0be4f3ee9b51cf6c1fb5e9f4 /src/mongo/db/ttl.cpp | |
parent | aa31cae71bc5390fc02d48fc793eebb1991d0ff3 (diff) | |
download | mongo-1a44dc6a150524de75dd46c8dc464c7182a37389.tar.gz |
SERVER-15570 Avoid querying system.indexes in ttl.cpp.
Changed so that the TTL indexes are processed in multiple phases:
- Acquire an IS-mode lock on the database and find all of the
TTL indexes
For each TTL index,
- Acquire an IX-mode lock on the collection and delete all of the
expired documents
Also avoid checking `userFlags` of collection stats in TTL index
tests since it is not set by some storage engines.
Diffstat (limited to 'src/mongo/db/ttl.cpp')
-rw-r--r-- | src/mongo/db/ttl.cpp | 220 |
1 files changed, 126 insertions, 94 deletions
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp index 9d5ad64b787..861d9dee426 100644 --- a/src/mongo/db/ttl.cpp +++ b/src/mongo/db/ttl.cpp @@ -41,6 +41,8 @@ #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/catalog/collection_catalog_entry.h" +#include "mongo/db/catalog/database_catalog_entry.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context_impl.h" @@ -59,97 +61,15 @@ namespace mongo { ServerStatusMetricField<Counter64> ttlDeletedDocumentsDisplay("ttl.deletedDocuments", &ttlDeletedDocuments); MONGO_EXPORT_SERVER_PARAMETER( ttlMonitorEnabled, bool, true ); - + class TTLMonitor : public BackgroundJob { public: TTLMonitor(){} virtual ~TTLMonitor(){} virtual string name() const { return "TTLMonitor"; } - - static string secondsExpireField; - - void doTTLForDB( OperationContext* txn, const string& dbName ) { - - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName)) - return; - - DBDirectClient db(txn); - vector<BSONObj> indexes; - { - auto_ptr<DBClientCursor> cursor = - db.query( dbName + ".system.indexes" , - BSON( secondsExpireField << BSON( "$exists" << true ) ) , - 0 , /* default nToReturn */ - 0 , /* default nToSkip */ - 0 , /* default fieldsToReturn */ - QueryOption_SlaveOk ); /* perform on secondaries too */ - if ( cursor.get() ) { - while ( cursor->more() ) { - indexes.push_back( cursor->next().getOwned() ); - } - } - } - - for ( unsigned i=0; i<indexes.size(); i++ ) { - BSONObj idx = indexes[i]; - - - BSONObj key = idx["key"].Obj(); - if ( key.nFields() != 1 ) { - error() << "key for ttl index can only have 1 field" << endl; - continue; - } - if (!idx[secondsExpireField].isNumber()) { - log() << "ttl indexes require the " << secondsExpireField << " field to be " - << "numeric but received a type of: " - << typeName(idx[secondsExpireField].type()); - continue; - } - - BSONObj query; - { - BSONObjBuilder b; - b.appendDate( "$lt" , curTimeMillis64() - ( 1000 * idx[secondsExpireField].numberLong() ) ); - query = BSON( key.firstElement().fieldName() << b.obj() ); - } - - LOG(1) << "TTL: " << key << " \t " << query << endl; - - long long n = 0; - { - const string ns = idx["ns"].String(); - Client::WriteContext ctx(txn, ns ); - Collection* collection = ctx.getCollection(); - if ( !collection ) { - // collection was dropped - continue; - } - - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase( - dbName)) { - // we've stepped down since we started this function, - // so we should stop working as we only do deletes on the primary - break; - } - - if ( collection->getIndexCatalog()->findIndexByKeyPattern( txn, key ) == NULL ) { - // index not finished yet - LOG(1) << " skipping index because not finished"; - continue; - } - - n = deleteObjects(txn, ctx.ctx().db(), ns, query, false, true); - ttlDeletedDocuments.increment( n ); - ctx.commit(); - } - - LOG(1) << "\tTTL deleted: " << n << endl; - } - - - } + static string secondsExpireField; virtual void run() { Client::initThread( name().c_str() ); @@ -157,18 +77,16 @@ namespace mongo { while ( ! inShutdown() ) { sleepsecs( 60 ); - + LOG(3) << "TTLMonitor thread awake" << endl; if ( !ttlMonitorEnabled ) { LOG(1) << "TTLMonitor is disabled" << endl; continue; } - - OperationContextImpl txn; if ( lockedForWriting() ) { - // note: this is not perfect as you can go into fsync+lock between + // note: this is not perfect as you can go into fsync+lock between // this and actually doing the delete later LOG(3) << " locked for writing" << endl; continue; @@ -187,22 +105,136 @@ namespace mongo { for ( set<string>::const_iterator i=dbs.begin(); i!=dbs.end(); ++i ) { string db = *i; - try { - doTTLForDB( &txn, db ); + + vector<BSONObj> indexes; + { + OperationContextImpl txn; + getTTLIndexesForDB( &txn, db, &indexes ); + } + + for ( vector<BSONObj>::const_iterator it = indexes.begin(); + it != indexes.end(); ++it ) { + + OperationContextImpl txn; + if ( !doTTLForIndex( &txn, db, *it ) ) { + break; // stop processing TTL indexes on this database + } } - catch ( DBException& e ) { - error() << "error processing ttl for db: " << db << " " << e << endl; + } + } + } + + private: + /** + * Acquire an IS-mode lock on the specified database and for each + * collection in the database, append the specification of all + * TTL indexes on those collections to the supplied vector. + * + * The index specifications are grouped by the collection to which + * they belong. + */ + void getTTLIndexesForDB( OperationContext* txn, const string& dbName, + vector<BSONObj>* indexes ) { + + invariant( indexes && indexes->empty() ); + Lock::DBLock dbLock( txn->lockState(), dbName, MODE_IS ); + + Database* db = dbHolder().get( txn, dbName ); + if ( !db ) { + return; // skip since database no longer exists + } + + const DatabaseCatalogEntry* dbEntry = db->getDatabaseCatalogEntry(); + + list<string> namespaces; + dbEntry->getCollectionNamespaces( &namespaces ); + + for ( list<string>::const_iterator it = namespaces.begin(); + it != namespaces.end(); ++it ) { + + CollectionCatalogEntry* coll = dbEntry->getCollectionCatalogEntry( txn, *it ); + + vector<string> indexNames; + coll->getAllIndexes( txn, &indexNames ); + for ( size_t i = 0; i < indexNames.size(); i++ ) { + const string& name = indexNames[i]; + BSONObj spec = coll->getIndexSpec( txn, name ); + + if ( spec.hasField( secondsExpireField ) ) { + indexes->push_back( spec.getOwned() ); } } + } + } + /** + * Remove documents from the collection using the specified TTL index + * after a sufficient amount of time has passed according to its expiry + * specification. + * + * @return true if caller should continue processing TTL indexes of collections + * on the specified database, and false otherwise + */ + bool doTTLForIndex( OperationContext* txn, const string& dbName, const BSONObj& idx ) { + BSONObj key = idx["key"].Obj(); + if ( key.nFields() != 1 ) { + error() << "key for ttl index can only have 1 field" << endl; + return true; } + if ( !idx[secondsExpireField].isNumber() ) { + log() << "ttl indexes require the " << secondsExpireField << " field to be " + << "numeric but received a type of: " + << typeName( idx[secondsExpireField].type() ); + return true; + } + + BSONObj query; + { + BSONObjBuilder b; + long long expireMs = 1000 * idx[secondsExpireField].numberLong(); + b.appendDate( "$lt", curTimeMillis64() - expireMs ); + query = BSON( key.firstElement().fieldName() << b.obj() ); + } + + LOG(1) << "TTL: " << key << " \t " << query << endl; + + long long n = 0; + { + const string ns = idx["ns"].String(); + + Client::WriteContext ctx( txn, ns ); + Collection* collection = ctx.getCollection(); + if ( !collection ) { + // collection was dropped + return true; + } + + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName)) { + // we've stepped down since we started this function, + // so we should stop working as we only do deletes on the primary + return false; + } + + if ( collection->getIndexCatalog()->findIndexByKeyPattern( txn, key ) == NULL ) { + // index not finished yet + LOG(1) << " skipping index because not finished"; + return true; + } + + n = deleteObjects( txn, ctx.ctx().db(), ns, query, false, true ); + ttlDeletedDocuments.increment( n ); + ctx.commit(); + } + + LOG(1) << "\tTTL deleted: " << n << endl; + return true; } }; void startTTLBackgroundJob() { TTLMonitor* ttl = new TTLMonitor(); ttl->go(); - } - + } + string TTLMonitor::secondsExpireField = "expireAfterSeconds"; } |