diff options
Diffstat (limited to 'src/mongo/db/ttl.cpp')
-rw-r--r-- | src/mongo/db/ttl.cpp | 464 |
1 files changed, 224 insertions, 240 deletions
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp index e680d6bd671..383f276d5a3 100644 --- a/src/mongo/db/ttl.cpp +++ b/src/mongo/db/ttl.cpp @@ -60,298 +60,282 @@ namespace mongo { - using std::set; - using std::endl; - using std::list; - using std::string; - using std::vector; - using std::unique_ptr; - - Counter64 ttlPasses; - Counter64 ttlDeletedDocuments; - - ServerStatusMetricField<Counter64> ttlPassesDisplay("ttl.passes", &ttlPasses); - ServerStatusMetricField<Counter64> ttlDeletedDocumentsDisplay("ttl.deletedDocuments", &ttlDeletedDocuments); - - MONGO_EXPORT_SERVER_PARAMETER( ttlMonitorEnabled, bool, true ); - MONGO_EXPORT_SERVER_PARAMETER( ttlMonitorSleepSecs, int, 60 ); //used for testing - - class TTLMonitor : public BackgroundJob { - public: - TTLMonitor(){} - virtual ~TTLMonitor(){} - - virtual string name() const { return "TTLMonitor"; } - - static string secondsExpireField; +using std::set; +using std::endl; +using std::list; +using std::string; +using std::vector; +using std::unique_ptr; + +Counter64 ttlPasses; +Counter64 ttlDeletedDocuments; + +ServerStatusMetricField<Counter64> ttlPassesDisplay("ttl.passes", &ttlPasses); +ServerStatusMetricField<Counter64> ttlDeletedDocumentsDisplay("ttl.deletedDocuments", + &ttlDeletedDocuments); + +MONGO_EXPORT_SERVER_PARAMETER(ttlMonitorEnabled, bool, true); +MONGO_EXPORT_SERVER_PARAMETER(ttlMonitorSleepSecs, int, 60); // used for testing + +class TTLMonitor : public BackgroundJob { +public: + TTLMonitor() {} + virtual ~TTLMonitor() {} + + virtual string name() const { + return "TTLMonitor"; + } - virtual void run() { - Client::initThread( name().c_str() ); - AuthorizationSession::get(cc())->grantInternalAuthorization(); + static string secondsExpireField; - while ( ! inShutdown() ) { - sleepsecs( ttlMonitorSleepSecs ); + virtual void run() { + Client::initThread(name().c_str()); + AuthorizationSession::get(cc())->grantInternalAuthorization(); - LOG(3) << "TTLMonitor thread awake" << endl; + while (!inShutdown()) { + sleepsecs(ttlMonitorSleepSecs); - if ( !ttlMonitorEnabled ) { - LOG(1) << "TTLMonitor is disabled" << endl; - continue; - } + LOG(3) << "TTLMonitor thread awake" << endl; - if ( lockedForWriting() ) { - // note: this is not perfect as you can go into fsync+lock between - // this and actually doing the delete later - LOG(3) << " locked for writing" << endl; - continue; - } + if (!ttlMonitorEnabled) { + LOG(1) << "TTLMonitor is disabled" << endl; + continue; + } - try { - doTTLPass(); - } - catch ( const WriteConflictException& e ) { - LOG(1) << "Got WriteConflictException in TTL thread"; - } + if (lockedForWriting()) { + // note: this is not perfect as you can go into fsync+lock between + // this and actually doing the delete later + LOG(3) << " locked for writing" << endl; + continue; + } + try { + doTTLPass(); + } catch (const WriteConflictException& e) { + LOG(1) << "Got WriteConflictException in TTL thread"; } } + } - private: - - void doTTLPass() { - // Count it as active from the moment the TTL thread wakes up - OperationContextImpl txn; +private: + void doTTLPass() { + // Count it as active from the moment the TTL thread wakes up + OperationContextImpl txn; - // if part of replSet but not in a readable state (e.g. during initial sync), skip. - if (repl::getGlobalReplicationCoordinator()->getReplicationMode() == + // if part of replSet but not in a readable state (e.g. during initial sync), skip. + if (repl::getGlobalReplicationCoordinator()->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - !repl::getGlobalReplicationCoordinator()->getMemberState().readable()) - return; - - set<string> dbs; - dbHolder().getAllShortNames( dbs ); + !repl::getGlobalReplicationCoordinator()->getMemberState().readable()) + return; - ttlPasses.increment(); + set<string> dbs; + dbHolder().getAllShortNames(dbs); - for ( set<string>::const_iterator i=dbs.begin(); i!=dbs.end(); ++i ) { - string db = *i; + ttlPasses.increment(); - vector<BSONObj> indexes; - getTTLIndexesForDB(&txn, db, &indexes); + for (set<string>::const_iterator i = dbs.begin(); i != dbs.end(); ++i) { + string db = *i; - for ( vector<BSONObj>::const_iterator it = indexes.begin(); - it != indexes.end(); ++it ) { + vector<BSONObj> indexes; + getTTLIndexesForDB(&txn, db, &indexes); - BSONObj idx = *it; - try { - if ( !doTTLForIndex( &txn, db, idx ) ) { - break; // stop processing TTL indexes on this database - } - } catch (const DBException& dbex) { - error() << "Error processing ttl index: " << idx - << " -- " << dbex.toString(); - // continue on to the next index - continue; + for (vector<BSONObj>::const_iterator it = indexes.begin(); it != indexes.end(); ++it) { + BSONObj idx = *it; + try { + if (!doTTLForIndex(&txn, db, idx)) { + break; // stop processing TTL indexes on this database } + } catch (const DBException& dbex) { + error() << "Error processing ttl index: " << idx << " -- " << dbex.toString(); + // continue on to the next index + continue; } } } + } - /** - * Acquire an IS-mode lock on the specified database and for each - * collection in the database, append the specification of all - * TTL indexes on those collections to the supplied vector. - * - * The index specifications are grouped by the collection to which - * they belong. - */ - void getTTLIndexesForDB( OperationContext* txn, const string& dbName, - vector<BSONObj>* indexes ) { - - invariant( indexes && indexes->empty() ); - ScopedTransaction transaction( txn, MODE_IS ); - Lock::DBLock dbLock( txn->lockState(), dbName, MODE_IS ); - - Database* db = dbHolder().get( txn, dbName ); - if ( !db ) { - return; // skip since database no longer exists - } - - const DatabaseCatalogEntry* dbEntry = db->getDatabaseCatalogEntry(); + /** + * Acquire an IS-mode lock on the specified database and for each + * collection in the database, append the specification of all + * TTL indexes on those collections to the supplied vector. + * + * The index specifications are grouped by the collection to which + * they belong. + */ + void getTTLIndexesForDB(OperationContext* txn, const string& dbName, vector<BSONObj>* indexes) { + invariant(indexes && indexes->empty()); + ScopedTransaction transaction(txn, MODE_IS); + Lock::DBLock dbLock(txn->lockState(), dbName, MODE_IS); + + Database* db = dbHolder().get(txn, dbName); + if (!db) { + return; // skip since database no longer exists + } - list<string> namespaces; - dbEntry->getCollectionNamespaces( &namespaces ); + const DatabaseCatalogEntry* dbEntry = db->getDatabaseCatalogEntry(); - for ( list<string>::const_iterator it = namespaces.begin(); - it != namespaces.end(); ++it ) { + list<string> namespaces; + dbEntry->getCollectionNamespaces(&namespaces); - string ns = *it; - Lock::CollectionLock collLock( txn->lockState(), ns, MODE_IS ); - CollectionCatalogEntry* coll = dbEntry->getCollectionCatalogEntry( ns ); + for (list<string>::const_iterator it = namespaces.begin(); it != namespaces.end(); ++it) { + string ns = *it; + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IS); + CollectionCatalogEntry* coll = dbEntry->getCollectionCatalogEntry(ns); - if ( !coll ) { - continue; // skip since collection not found in catalog - } + if (!coll) { + continue; // skip since collection not found in catalog + } - vector<string> indexNames; - coll->getAllIndexes( txn, &indexNames ); - for ( size_t i = 0; i < indexNames.size(); i++ ) { - const string& name = indexNames[i]; - BSONObj spec = coll->getIndexSpec( txn, name ); + vector<string> indexNames; + coll->getAllIndexes(txn, &indexNames); + for (size_t i = 0; i < indexNames.size(); i++) { + const string& name = indexNames[i]; + BSONObj spec = coll->getIndexSpec(txn, name); - if ( spec.hasField( secondsExpireField ) ) { - indexes->push_back( spec.getOwned() ); - } + if (spec.hasField(secondsExpireField)) { + indexes->push_back(spec.getOwned()); } } } + } - /** - * Remove documents from the collection using the specified TTL index - * after a sufficient amount of time has passed according to its expiry - * specification. - * - * @return true if caller should continue processing TTL indexes of collections - * on the specified database, and false otherwise - */ - bool doTTLForIndex(OperationContext* txn, const string& dbName, BSONObj idx) { - const string ns = idx["ns"].String(); - NamespaceString nss(ns); - if (!userAllowedWriteNS(nss).isOK()) { - error() << "namespace '" << ns << "' doesn't allow deletes, skipping ttl job for: " - << idx; - return true; - } - - BSONObj key = idx["key"].Obj(); - if (key.nFields() != 1) { - error() << "key for ttl index can only have 1 field, skipping ttl job for: " << idx; - return true; - } + /** + * Remove documents from the collection using the specified TTL index + * after a sufficient amount of time has passed according to its expiry + * specification. + * + * @return true if caller should continue processing TTL indexes of collections + * on the specified database, and false otherwise + */ + bool doTTLForIndex(OperationContext* txn, const string& dbName, BSONObj idx) { + const string ns = idx["ns"].String(); + NamespaceString nss(ns); + if (!userAllowedWriteNS(nss).isOK()) { + error() << "namespace '" << ns + << "' doesn't allow deletes, skipping ttl job for: " << idx; + return true; + } - LOG(1) << "TTL -- ns: " << ns << " key: " << key; + BSONObj key = idx["key"].Obj(); + if (key.nFields() != 1) { + error() << "key for ttl index can only have 1 field, skipping ttl job for: " << idx; + return true; + } - // Read the current time outside of the while loop, so that we don't expand our index - // bounds after every WriteConflictException. - const Date_t now = Date_t::now(); + LOG(1) << "TTL -- ns: " << ns << " key: " << key; - long long numDeleted = 0; - int attempt = 1; - while (1) { - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetDb autoDb(txn, dbName, MODE_IX); - Database* db = autoDb.getDb(); - if (!db) { - return false; - } + // Read the current time outside of the while loop, so that we don't expand our index + // bounds after every WriteConflictException. + const Date_t now = Date_t::now(); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX); + long long numDeleted = 0; + int attempt = 1; + while (1) { + ScopedTransaction scopedXact(txn, MODE_IX); + AutoGetDb autoDb(txn, dbName, MODE_IX); + Database* db = autoDb.getDb(); + if (!db) { + return false; + } - Collection* collection = db->getCollection(ns); - if (!collection) { - // Collection was dropped. - return true; - } + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - // We've stepped down since we started this function, so we should stop working - // as we only do deletes on the primary. - return false; - } + Collection* collection = db->getCollection(ns); + if (!collection) { + // Collection was dropped. + return true; + } - IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn, - key); - if (!desc) { - LOG(1) << "index not found (index build in progress? index dropped?), skipping " - << "ttl job for: " << idx; - return true; - } + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { + // We've stepped down since we started this function, so we should stop working + // as we only do deletes on the primary. + return false; + } - // Re-read 'idx' from the descriptor, in case the collection or index definition - // changed before we re-acquired the collection lock. - idx = desc->infoObj(); + IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn, key); + if (!desc) { + LOG(1) << "index not found (index build in progress? index dropped?), skipping " + << "ttl job for: " << idx; + return true; + } - if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) { - error() << "special index can't be used as a ttl index, skipping ttl job for: " - << idx; - return true; - } + // Re-read 'idx' from the descriptor, in case the collection or index definition + // changed before we re-acquired the collection lock. + idx = desc->infoObj(); - BSONElement secondsExpireElt = idx[secondsExpireField]; - if (!secondsExpireElt.isNumber()) { - error() << "ttl indexes require the " << secondsExpireField << " field to be " - << "numeric but received a type of " - << typeName(secondsExpireElt.type()) << ", skipping ttl job for: " - << idx; - return true; - } + if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) { + error() << "special index can't be used as a ttl index, skipping ttl job for: " + << idx; + return true; + } - const Date_t kDawnOfTime = - Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min()); - const BSONObj startKey = BSON("" << kDawnOfTime); - const BSONObj endKey = - BSON("" << now - Seconds(secondsExpireElt.numberLong())); - const bool endKeyInclusive = true; - // The canonical check as to whether a key pattern element is "ascending" or - // "descending" is (elt.number() >= 0). This is defined by the Ordering class. - const InternalPlanner::Direction direction = - (key.firstElement().number() >= 0) ? InternalPlanner::Direction::FORWARD - : InternalPlanner::Direction::BACKWARD; - unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn, - collection, - desc, - startKey, - endKey, - endKeyInclusive, - direction)); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + BSONElement secondsExpireElt = idx[secondsExpireField]; + if (!secondsExpireElt.isNumber()) { + error() << "ttl indexes require the " << secondsExpireField << " field to be " + << "numeric but received a type of " << typeName(secondsExpireElt.type()) + << ", skipping ttl job for: " << idx; + return true; + } - try { - PlanExecutor::ExecState state; - BSONObj obj; - RecordId rid; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, &rid))) { - exec->saveState(); - { - WriteUnitOfWork wunit(txn); - collection->deleteDocument(txn, rid); - wunit.commit(); - } - ++numDeleted; - ttlDeletedDocuments.increment(); - if (!exec->restoreState(txn)) { - return true; - } + const Date_t kDawnOfTime = + Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min()); + const BSONObj startKey = BSON("" << kDawnOfTime); + const BSONObj endKey = BSON("" << now - Seconds(secondsExpireElt.numberLong())); + const bool endKeyInclusive = true; + // The canonical check as to whether a key pattern element is "ascending" or + // "descending" is (elt.number() >= 0). This is defined by the Ordering class. + const InternalPlanner::Direction direction = (key.firstElement().number() >= 0) + ? InternalPlanner::Direction::FORWARD + : InternalPlanner::Direction::BACKWARD; + unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan( + txn, collection, desc, startKey, endKey, endKeyInclusive, direction)); + exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + + try { + PlanExecutor::ExecState state; + BSONObj obj; + RecordId rid; + while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, &rid))) { + exec->saveState(); + { + WriteUnitOfWork wunit(txn); + collection->deleteDocument(txn, rid); + wunit.commit(); } - - if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { - if (WorkingSetCommon::isValidStatusMemberObject(obj)) { - error() << "ttl query execution for index " << idx << " failed with: " - << WorkingSetCommon::getMemberObjectStatus(obj); - return true; - } - error() << "ttl query execution for index " << idx << " failed with state: " - << PlanExecutor::statestr(state); + ++numDeleted; + ttlDeletedDocuments.increment(); + if (!exec->restoreState(txn)) { return true; } - - invariant(PlanExecutor::IS_EOF == state); - break; } - catch (const WriteConflictException& dle) { - WriteConflictException::logAndBackoff(attempt++, "ttl", ns); + + if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { + if (WorkingSetCommon::isValidStatusMemberObject(obj)) { + error() << "ttl query execution for index " << idx + << " failed with: " << WorkingSetCommon::getMemberObjectStatus(obj); + return true; + } + error() << "ttl query execution for index " << idx + << " failed with state: " << PlanExecutor::statestr(state); + return true; } - } - LOG(1) << "\tTTL deleted: " << numDeleted << endl; - return true; + invariant(PlanExecutor::IS_EOF == state); + break; + } catch (const WriteConflictException& dle) { + WriteConflictException::logAndBackoff(attempt++, "ttl", ns); + } } - }; - void startTTLBackgroundJob() { - TTLMonitor* ttl = new TTLMonitor(); - ttl->go(); + LOG(1) << "\tTTL deleted: " << numDeleted << endl; + return true; } +}; + +void startTTLBackgroundJob() { + TTLMonitor* ttl = new TTLMonitor(); + ttl->go(); +} - string TTLMonitor::secondsExpireField = "expireAfterSeconds"; +string TTLMonitor::secondsExpireField = "expireAfterSeconds"; } |