summaryrefslogtreecommitdiff
path: root/src/mongo/db/ttl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/ttl.cpp')
-rw-r--r--src/mongo/db/ttl.cpp464
1 files changed, 224 insertions, 240 deletions
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp
index e680d6bd671..383f276d5a3 100644
--- a/src/mongo/db/ttl.cpp
+++ b/src/mongo/db/ttl.cpp
@@ -60,298 +60,282 @@
namespace mongo {
- using std::set;
- using std::endl;
- using std::list;
- using std::string;
- using std::vector;
- using std::unique_ptr;
-
- Counter64 ttlPasses;
- Counter64 ttlDeletedDocuments;
-
- ServerStatusMetricField<Counter64> ttlPassesDisplay("ttl.passes", &ttlPasses);
- ServerStatusMetricField<Counter64> ttlDeletedDocumentsDisplay("ttl.deletedDocuments", &ttlDeletedDocuments);
-
- MONGO_EXPORT_SERVER_PARAMETER( ttlMonitorEnabled, bool, true );
- MONGO_EXPORT_SERVER_PARAMETER( ttlMonitorSleepSecs, int, 60 ); //used for testing
-
- class TTLMonitor : public BackgroundJob {
- public:
- TTLMonitor(){}
- virtual ~TTLMonitor(){}
-
- virtual string name() const { return "TTLMonitor"; }
-
- static string secondsExpireField;
+using std::set;
+using std::endl;
+using std::list;
+using std::string;
+using std::vector;
+using std::unique_ptr;
+
+Counter64 ttlPasses;
+Counter64 ttlDeletedDocuments;
+
+ServerStatusMetricField<Counter64> ttlPassesDisplay("ttl.passes", &ttlPasses);
+ServerStatusMetricField<Counter64> ttlDeletedDocumentsDisplay("ttl.deletedDocuments",
+ &ttlDeletedDocuments);
+
+MONGO_EXPORT_SERVER_PARAMETER(ttlMonitorEnabled, bool, true);
+MONGO_EXPORT_SERVER_PARAMETER(ttlMonitorSleepSecs, int, 60); // used for testing
+
+class TTLMonitor : public BackgroundJob {
+public:
+ TTLMonitor() {}
+ virtual ~TTLMonitor() {}
+
+ virtual string name() const {
+ return "TTLMonitor";
+ }
- virtual void run() {
- Client::initThread( name().c_str() );
- AuthorizationSession::get(cc())->grantInternalAuthorization();
+ static string secondsExpireField;
- while ( ! inShutdown() ) {
- sleepsecs( ttlMonitorSleepSecs );
+ virtual void run() {
+ Client::initThread(name().c_str());
+ AuthorizationSession::get(cc())->grantInternalAuthorization();
- LOG(3) << "TTLMonitor thread awake" << endl;
+ while (!inShutdown()) {
+ sleepsecs(ttlMonitorSleepSecs);
- if ( !ttlMonitorEnabled ) {
- LOG(1) << "TTLMonitor is disabled" << endl;
- continue;
- }
+ LOG(3) << "TTLMonitor thread awake" << endl;
- if ( lockedForWriting() ) {
- // note: this is not perfect as you can go into fsync+lock between
- // this and actually doing the delete later
- LOG(3) << " locked for writing" << endl;
- continue;
- }
+ if (!ttlMonitorEnabled) {
+ LOG(1) << "TTLMonitor is disabled" << endl;
+ continue;
+ }
- try {
- doTTLPass();
- }
- catch ( const WriteConflictException& e ) {
- LOG(1) << "Got WriteConflictException in TTL thread";
- }
+ if (lockedForWriting()) {
+ // note: this is not perfect as you can go into fsync+lock between
+ // this and actually doing the delete later
+ LOG(3) << " locked for writing" << endl;
+ continue;
+ }
+ try {
+ doTTLPass();
+ } catch (const WriteConflictException& e) {
+ LOG(1) << "Got WriteConflictException in TTL thread";
}
}
+ }
- private:
-
- void doTTLPass() {
- // Count it as active from the moment the TTL thread wakes up
- OperationContextImpl txn;
+private:
+ void doTTLPass() {
+ // Count it as active from the moment the TTL thread wakes up
+ OperationContextImpl txn;
- // if part of replSet but not in a readable state (e.g. during initial sync), skip.
- if (repl::getGlobalReplicationCoordinator()->getReplicationMode() ==
+ // if part of replSet but not in a readable state (e.g. during initial sync), skip.
+ if (repl::getGlobalReplicationCoordinator()->getReplicationMode() ==
repl::ReplicationCoordinator::modeReplSet &&
- !repl::getGlobalReplicationCoordinator()->getMemberState().readable())
- return;
-
- set<string> dbs;
- dbHolder().getAllShortNames( dbs );
+ !repl::getGlobalReplicationCoordinator()->getMemberState().readable())
+ return;
- ttlPasses.increment();
+ set<string> dbs;
+ dbHolder().getAllShortNames(dbs);
- for ( set<string>::const_iterator i=dbs.begin(); i!=dbs.end(); ++i ) {
- string db = *i;
+ ttlPasses.increment();
- vector<BSONObj> indexes;
- getTTLIndexesForDB(&txn, db, &indexes);
+ for (set<string>::const_iterator i = dbs.begin(); i != dbs.end(); ++i) {
+ string db = *i;
- for ( vector<BSONObj>::const_iterator it = indexes.begin();
- it != indexes.end(); ++it ) {
+ vector<BSONObj> indexes;
+ getTTLIndexesForDB(&txn, db, &indexes);
- BSONObj idx = *it;
- try {
- if ( !doTTLForIndex( &txn, db, idx ) ) {
- break; // stop processing TTL indexes on this database
- }
- } catch (const DBException& dbex) {
- error() << "Error processing ttl index: " << idx
- << " -- " << dbex.toString();
- // continue on to the next index
- continue;
+ for (vector<BSONObj>::const_iterator it = indexes.begin(); it != indexes.end(); ++it) {
+ BSONObj idx = *it;
+ try {
+ if (!doTTLForIndex(&txn, db, idx)) {
+ break; // stop processing TTL indexes on this database
}
+ } catch (const DBException& dbex) {
+ error() << "Error processing ttl index: " << idx << " -- " << dbex.toString();
+ // continue on to the next index
+ continue;
}
}
}
+ }
- /**
- * Acquire an IS-mode lock on the specified database and for each
- * collection in the database, append the specification of all
- * TTL indexes on those collections to the supplied vector.
- *
- * The index specifications are grouped by the collection to which
- * they belong.
- */
- void getTTLIndexesForDB( OperationContext* txn, const string& dbName,
- vector<BSONObj>* indexes ) {
-
- invariant( indexes && indexes->empty() );
- ScopedTransaction transaction( txn, MODE_IS );
- Lock::DBLock dbLock( txn->lockState(), dbName, MODE_IS );
-
- Database* db = dbHolder().get( txn, dbName );
- if ( !db ) {
- return; // skip since database no longer exists
- }
-
- const DatabaseCatalogEntry* dbEntry = db->getDatabaseCatalogEntry();
+ /**
+ * Acquire an IS-mode lock on the specified database and for each
+ * collection in the database, append the specification of all
+ * TTL indexes on those collections to the supplied vector.
+ *
+ * The index specifications are grouped by the collection to which
+ * they belong.
+ */
+ void getTTLIndexesForDB(OperationContext* txn, const string& dbName, vector<BSONObj>* indexes) {
+ invariant(indexes && indexes->empty());
+ ScopedTransaction transaction(txn, MODE_IS);
+ Lock::DBLock dbLock(txn->lockState(), dbName, MODE_IS);
+
+ Database* db = dbHolder().get(txn, dbName);
+ if (!db) {
+ return; // skip since database no longer exists
+ }
- list<string> namespaces;
- dbEntry->getCollectionNamespaces( &namespaces );
+ const DatabaseCatalogEntry* dbEntry = db->getDatabaseCatalogEntry();
- for ( list<string>::const_iterator it = namespaces.begin();
- it != namespaces.end(); ++it ) {
+ list<string> namespaces;
+ dbEntry->getCollectionNamespaces(&namespaces);
- string ns = *it;
- Lock::CollectionLock collLock( txn->lockState(), ns, MODE_IS );
- CollectionCatalogEntry* coll = dbEntry->getCollectionCatalogEntry( ns );
+ for (list<string>::const_iterator it = namespaces.begin(); it != namespaces.end(); ++it) {
+ string ns = *it;
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IS);
+ CollectionCatalogEntry* coll = dbEntry->getCollectionCatalogEntry(ns);
- if ( !coll ) {
- continue; // skip since collection not found in catalog
- }
+ if (!coll) {
+ continue; // skip since collection not found in catalog
+ }
- vector<string> indexNames;
- coll->getAllIndexes( txn, &indexNames );
- for ( size_t i = 0; i < indexNames.size(); i++ ) {
- const string& name = indexNames[i];
- BSONObj spec = coll->getIndexSpec( txn, name );
+ vector<string> indexNames;
+ coll->getAllIndexes(txn, &indexNames);
+ for (size_t i = 0; i < indexNames.size(); i++) {
+ const string& name = indexNames[i];
+ BSONObj spec = coll->getIndexSpec(txn, name);
- if ( spec.hasField( secondsExpireField ) ) {
- indexes->push_back( spec.getOwned() );
- }
+ if (spec.hasField(secondsExpireField)) {
+ indexes->push_back(spec.getOwned());
}
}
}
+ }
- /**
- * Remove documents from the collection using the specified TTL index
- * after a sufficient amount of time has passed according to its expiry
- * specification.
- *
- * @return true if caller should continue processing TTL indexes of collections
- * on the specified database, and false otherwise
- */
- bool doTTLForIndex(OperationContext* txn, const string& dbName, BSONObj idx) {
- const string ns = idx["ns"].String();
- NamespaceString nss(ns);
- if (!userAllowedWriteNS(nss).isOK()) {
- error() << "namespace '" << ns << "' doesn't allow deletes, skipping ttl job for: "
- << idx;
- return true;
- }
-
- BSONObj key = idx["key"].Obj();
- if (key.nFields() != 1) {
- error() << "key for ttl index can only have 1 field, skipping ttl job for: " << idx;
- return true;
- }
+ /**
+ * Remove documents from the collection using the specified TTL index
+ * after a sufficient amount of time has passed according to its expiry
+ * specification.
+ *
+ * @return true if caller should continue processing TTL indexes of collections
+ * on the specified database, and false otherwise
+ */
+ bool doTTLForIndex(OperationContext* txn, const string& dbName, BSONObj idx) {
+ const string ns = idx["ns"].String();
+ NamespaceString nss(ns);
+ if (!userAllowedWriteNS(nss).isOK()) {
+ error() << "namespace '" << ns
+ << "' doesn't allow deletes, skipping ttl job for: " << idx;
+ return true;
+ }
- LOG(1) << "TTL -- ns: " << ns << " key: " << key;
+ BSONObj key = idx["key"].Obj();
+ if (key.nFields() != 1) {
+ error() << "key for ttl index can only have 1 field, skipping ttl job for: " << idx;
+ return true;
+ }
- // Read the current time outside of the while loop, so that we don't expand our index
- // bounds after every WriteConflictException.
- const Date_t now = Date_t::now();
+ LOG(1) << "TTL -- ns: " << ns << " key: " << key;
- long long numDeleted = 0;
- int attempt = 1;
- while (1) {
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetDb autoDb(txn, dbName, MODE_IX);
- Database* db = autoDb.getDb();
- if (!db) {
- return false;
- }
+ // Read the current time outside of the while loop, so that we don't expand our index
+ // bounds after every WriteConflictException.
+ const Date_t now = Date_t::now();
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX);
+ long long numDeleted = 0;
+ int attempt = 1;
+ while (1) {
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ AutoGetDb autoDb(txn, dbName, MODE_IX);
+ Database* db = autoDb.getDb();
+ if (!db) {
+ return false;
+ }
- Collection* collection = db->getCollection(ns);
- if (!collection) {
- // Collection was dropped.
- return true;
- }
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- // We've stepped down since we started this function, so we should stop working
- // as we only do deletes on the primary.
- return false;
- }
+ Collection* collection = db->getCollection(ns);
+ if (!collection) {
+ // Collection was dropped.
+ return true;
+ }
- IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn,
- key);
- if (!desc) {
- LOG(1) << "index not found (index build in progress? index dropped?), skipping "
- << "ttl job for: " << idx;
- return true;
- }
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
+ // We've stepped down since we started this function, so we should stop working
+ // as we only do deletes on the primary.
+ return false;
+ }
- // Re-read 'idx' from the descriptor, in case the collection or index definition
- // changed before we re-acquired the collection lock.
- idx = desc->infoObj();
+ IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn, key);
+ if (!desc) {
+ LOG(1) << "index not found (index build in progress? index dropped?), skipping "
+ << "ttl job for: " << idx;
+ return true;
+ }
- if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) {
- error() << "special index can't be used as a ttl index, skipping ttl job for: "
- << idx;
- return true;
- }
+ // Re-read 'idx' from the descriptor, in case the collection or index definition
+ // changed before we re-acquired the collection lock.
+ idx = desc->infoObj();
- BSONElement secondsExpireElt = idx[secondsExpireField];
- if (!secondsExpireElt.isNumber()) {
- error() << "ttl indexes require the " << secondsExpireField << " field to be "
- << "numeric but received a type of "
- << typeName(secondsExpireElt.type()) << ", skipping ttl job for: "
- << idx;
- return true;
- }
+ if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) {
+ error() << "special index can't be used as a ttl index, skipping ttl job for: "
+ << idx;
+ return true;
+ }
- const Date_t kDawnOfTime =
- Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min());
- const BSONObj startKey = BSON("" << kDawnOfTime);
- const BSONObj endKey =
- BSON("" << now - Seconds(secondsExpireElt.numberLong()));
- const bool endKeyInclusive = true;
- // The canonical check as to whether a key pattern element is "ascending" or
- // "descending" is (elt.number() >= 0). This is defined by the Ordering class.
- const InternalPlanner::Direction direction =
- (key.firstElement().number() >= 0) ? InternalPlanner::Direction::FORWARD
- : InternalPlanner::Direction::BACKWARD;
- unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn,
- collection,
- desc,
- startKey,
- endKey,
- endKeyInclusive,
- direction));
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ BSONElement secondsExpireElt = idx[secondsExpireField];
+ if (!secondsExpireElt.isNumber()) {
+ error() << "ttl indexes require the " << secondsExpireField << " field to be "
+ << "numeric but received a type of " << typeName(secondsExpireElt.type())
+ << ", skipping ttl job for: " << idx;
+ return true;
+ }
- try {
- PlanExecutor::ExecState state;
- BSONObj obj;
- RecordId rid;
- while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, &rid))) {
- exec->saveState();
- {
- WriteUnitOfWork wunit(txn);
- collection->deleteDocument(txn, rid);
- wunit.commit();
- }
- ++numDeleted;
- ttlDeletedDocuments.increment();
- if (!exec->restoreState(txn)) {
- return true;
- }
+ const Date_t kDawnOfTime =
+ Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min());
+ const BSONObj startKey = BSON("" << kDawnOfTime);
+ const BSONObj endKey = BSON("" << now - Seconds(secondsExpireElt.numberLong()));
+ const bool endKeyInclusive = true;
+ // The canonical check as to whether a key pattern element is "ascending" or
+ // "descending" is (elt.number() >= 0). This is defined by the Ordering class.
+ const InternalPlanner::Direction direction = (key.firstElement().number() >= 0)
+ ? InternalPlanner::Direction::FORWARD
+ : InternalPlanner::Direction::BACKWARD;
+ unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(
+ txn, collection, desc, startKey, endKey, endKeyInclusive, direction));
+ exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+
+ try {
+ PlanExecutor::ExecState state;
+ BSONObj obj;
+ RecordId rid;
+ while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, &rid))) {
+ exec->saveState();
+ {
+ WriteUnitOfWork wunit(txn);
+ collection->deleteDocument(txn, rid);
+ wunit.commit();
}
-
- if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
- if (WorkingSetCommon::isValidStatusMemberObject(obj)) {
- error() << "ttl query execution for index " << idx << " failed with: "
- << WorkingSetCommon::getMemberObjectStatus(obj);
- return true;
- }
- error() << "ttl query execution for index " << idx << " failed with state: "
- << PlanExecutor::statestr(state);
+ ++numDeleted;
+ ttlDeletedDocuments.increment();
+ if (!exec->restoreState(txn)) {
return true;
}
-
- invariant(PlanExecutor::IS_EOF == state);
- break;
}
- catch (const WriteConflictException& dle) {
- WriteConflictException::logAndBackoff(attempt++, "ttl", ns);
+
+ if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
+ if (WorkingSetCommon::isValidStatusMemberObject(obj)) {
+ error() << "ttl query execution for index " << idx
+ << " failed with: " << WorkingSetCommon::getMemberObjectStatus(obj);
+ return true;
+ }
+ error() << "ttl query execution for index " << idx
+ << " failed with state: " << PlanExecutor::statestr(state);
+ return true;
}
- }
- LOG(1) << "\tTTL deleted: " << numDeleted << endl;
- return true;
+ invariant(PlanExecutor::IS_EOF == state);
+ break;
+ } catch (const WriteConflictException& dle) {
+ WriteConflictException::logAndBackoff(attempt++, "ttl", ns);
+ }
}
- };
- void startTTLBackgroundJob() {
- TTLMonitor* ttl = new TTLMonitor();
- ttl->go();
+ LOG(1) << "\tTTL deleted: " << numDeleted << endl;
+ return true;
}
+};
+
+void startTTLBackgroundJob() {
+ TTLMonitor* ttl = new TTLMonitor();
+ ttl->go();
+}
- string TTLMonitor::secondsExpireField = "expireAfterSeconds";
+string TTLMonitor::secondsExpireField = "expireAfterSeconds";
}