summaryrefslogtreecommitdiff
path: root/src/mongo/db/ttl.cpp
diff options
context:
space:
mode:
authorJason Rassi <rassi@10gen.com>2015-04-30 21:16:12 -0400
committerJason Rassi <rassi@10gen.com>2015-05-07 22:21:58 -0400
commit3a8bcdfb23ab85b57e5da308fb4d3c607ce77a49 (patch)
tree7bd82dfb4c40502403170dd9416949a165b1251f /src/mongo/db/ttl.cpp
parentb2434192437dffe842338d2cb551453c0276ee4f (diff)
downloadmongo-3a8bcdfb23ab85b57e5da308fb4d3c607ce77a49.tar.gz
SERVER-17984 doTTLForIndex() delete with manual index scan
Diffstat (limited to 'src/mongo/db/ttl.cpp')
-rw-r--r--src/mongo/db/ttl.cpp127
1 files changed, 93 insertions, 34 deletions
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp
index eeb17a984a8..43845ffaa34 100644
--- a/src/mongo/db/ttl.cpp
+++ b/src/mongo/db/ttl.cpp
@@ -46,9 +46,11 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/operation_context_impl.h"
-#include "mongo/db/ops/delete.h"
+#include "mongo/db/ops/insert.h"
+#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
#include "mongo/util/background.h"
@@ -62,6 +64,7 @@ namespace mongo {
using std::list;
using std::string;
using std::vector;
+ using std::unique_ptr;
Counter64 ttlPasses;
Counter64 ttlDeletedDocuments;
@@ -210,29 +213,25 @@ namespace mongo {
* @return true if caller should continue processing TTL indexes of collections
* on the specified database, and false otherwise
*/
- bool doTTLForIndex( OperationContext* txn, const string& dbName, const BSONObj& idx ) {
- BSONObj key = idx["key"].Obj();
+ bool doTTLForIndex(OperationContext* txn, const string& dbName, BSONObj idx) {
const string ns = idx["ns"].String();
- if ( key.nFields() != 1 ) {
- error() << "key for ttl index can only have 1 field" << endl;
+ if (!userAllowedWriteNS(ns).isOK()) {
+ error() << "namespace '" << ns << "' doesn't allow deletes, skipping ttl job for: "
+ << idx;
return true;
}
- if ( !idx[secondsExpireField].isNumber() ) {
- log() << "ttl indexes require the " << secondsExpireField << " field to be "
- << "numeric but received a type of: "
- << typeName( idx[secondsExpireField].type() );
+
+ BSONObj key = idx["key"].Obj();
+ if (key.nFields() != 1) {
+ error() << "key for ttl index can only have 1 field, skipping ttl job for: " << idx;
return true;
}
- BSONObj query;
- {
- BSONObjBuilder b;
- long long expireMs = 1000 * idx[secondsExpireField].numberLong();
- b.appendDate( "$lt", curTimeMillis64() - expireMs );
- query = BSON( key.firstElement().fieldName() << b.obj() );
- }
+ LOG(1) << "TTL -- ns: " << ns << " key: " << key;
- LOG(1) << "TTL -- ns: " << ns << "key:" << key << " query: " << query << endl;
+ // Read the current time outside of the while loop, so that we don't expand our index
+ // bounds after every WriteConflictException.
+ unsigned long long now = curTimeMillis64();
long long numDeleted = 0;
int attempt = 1;
@@ -244,33 +243,94 @@ namespace mongo {
return false;
}
- Lock::CollectionLock collLock( txn->lockState(), ns, MODE_IX );
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX);
- Collection* collection = db->getCollection( ns );
- if ( !collection ) {
- // collection was dropped
+ Collection* collection = db->getCollection(ns);
+ if (!collection) {
+ // Collection was dropped.
return true;
}
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(dbName)) {
- // we've stepped down since we started this function,
- // so we should stop working as we only do deletes on the primary
+ // We've stepped down since we started this function, so we should stop working
+ // as we only do deletes on the primary.
return false;
}
- if ( collection->getIndexCatalog()->findIndexByKeyPattern( txn, key ) == NULL ) {
- // index not finished yet
- LOG(1) << " skipping index because not finished";
+ IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn,
+ key);
+ if (!desc) {
+ LOG(1) << "index not found (index build in progress? index dropped?), skipping "
+ << "ttl job for: " << idx;
return true;
}
+ // Re-read 'idx' from the descriptor, in case the collection or index definition
+ // changed before we re-acquired the collection lock.
+ idx = desc->infoObj();
+
+ if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) {
+ error() << "special index can't be used as a ttl index, skipping ttl job for: "
+ << idx;
+ return true;
+ }
+
+ BSONElement secondsExpireElt = idx[secondsExpireField];
+ if (!secondsExpireElt.isNumber()) {
+ error() << "ttl indexes require the " << secondsExpireField << " field to be "
+ << "numeric but received a type of "
+ << typeName(secondsExpireElt.type()) << ", skipping ttl job for: "
+ << idx;
+ return true;
+ }
+
+ const Date_t kDawnOfTime(std::numeric_limits<long long>::min());
+ const BSONObj startKey = BSON("" << kDawnOfTime);
+ const BSONObj endKey =
+ BSON("" << Date_t(now - (1000 * secondsExpireElt.numberLong())));
+ const bool endKeyInclusive = true;
+ // The canonical check as to whether a key pattern element is "ascending" or
+ // "descending" is (elt.number() >= 0). This is defined by the Ordering class.
+ const InternalPlanner::Direction direction =
+ (key.firstElement().number() >= 0) ? InternalPlanner::Direction::FORWARD
+ : InternalPlanner::Direction::BACKWARD;
+ unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn,
+ collection,
+ desc,
+ startKey,
+ endKey,
+ endKeyInclusive,
+ direction));
+ exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+
try {
- numDeleted = deleteObjects(txn,
- db,
- ns,
- query,
- PlanExecutor::YIELD_AUTO,
- false);
+ PlanExecutor::ExecState state;
+ BSONObj obj;
+ RecordId rid;
+ while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, &rid))) {
+ exec->saveState();
+ {
+ WriteUnitOfWork wunit(txn);
+ collection->deleteDocument(txn, rid);
+ wunit.commit();
+ }
+ ++numDeleted;
+ ttlDeletedDocuments.increment();
+ if (!exec->restoreState(txn)) {
+ return true;
+ }
+ }
+ if (PlanExecutor::IS_EOF != state) {
+ if (PlanExecutor::FAILURE == state &&
+ WorkingSetCommon::isValidStatusMemberObject(obj)) {
+ error() << "ttl query execution for index " << idx << " failed with: "
+ << WorkingSetCommon::getMemberObjectStatus(obj);
+ return true;
+ }
+ error() << "ttl query execution for index " << idx << " failed with state: "
+ << PlanExecutor::statestr(state);
+ return true;
+ }
break;
}
catch (const WriteConflictException& dle) {
@@ -278,7 +338,6 @@ namespace mongo {
}
}
- ttlDeletedDocuments.increment(numDeleted);
LOG(1) << "\tTTL deleted: " << numDeleted << endl;
return true;
}