summaryrefslogtreecommitdiff
path: root/src/mongo/db/ttl.cpp
diff options
context:
space:
mode:
authorQingyang Chen <qingyang.chen@10gen.com>2015-08-06 15:20:19 -0400
committerQingyang Chen <qingyang.chen@10gen.com>2015-08-13 14:18:55 -0400
commit7c808909a8c0a746ffd2d8153c90a77be8b85204 (patch)
tree988b67fed5c71f6906a6e7852688576a456fdf6d /src/mongo/db/ttl.cpp
parent9eadefd1f5e9f9a480f14fe5ad0b1b1838e96005 (diff)
downloadmongo-7c808909a8c0a746ffd2d8153c90a77be8b85204.tar.gz
SERVER-19466 TTLMonitor::doTTLForIndex() use IXSCAN => FETCH => DELETE
Diffstat (limited to 'src/mongo/db/ttl.cpp')
-rw-r--r--src/mongo/db/ttl.cpp175
1 files changed, 81 insertions, 94 deletions
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp
index c05e886468d..9e5652d2e42 100644
--- a/src/mongo/db/ttl.cpp
+++ b/src/mongo/db/ttl.cpp
@@ -46,7 +46,7 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/exec/working_set_common.h"
+#include "mongo/db/exec/delete.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/ops/insert.h"
@@ -224,110 +224,97 @@ private:
LOG(1) << "TTL -- ns: " << ns << " key: " << key;
- // Read the current time outside of the while loop, so that we don't expand our index
- // bounds after every WriteConflictException.
- const Date_t now = Date_t::now();
-
- long long numDeleted = 0;
- int attempt = 1;
- while (1) {
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetDb autoDb(txn, dbName, MODE_IX);
- Database* db = autoDb.getDb();
- if (!db) {
- return false;
- }
-
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX);
-
- Collection* collection = db->getCollection(ns);
- if (!collection) {
- // Collection was dropped.
- return true;
- }
+ ScopedTransaction scopedXact(txn, MODE_IX);
+ AutoGetDb autoDb(txn, dbName, MODE_IX);
+ Database* db = autoDb.getDb();
+ if (!db) {
+ return false;
+ }
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- // We've stepped down since we started this function, so we should stop working
- // as we only do deletes on the primary.
- return false;
- }
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX);
- IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn, key);
- if (!desc) {
- LOG(1) << "index not found (index build in progress? index dropped?), skipping "
- << "ttl job for: " << idx;
- return true;
- }
-
- // Re-read 'idx' from the descriptor, in case the collection or index definition
- // changed before we re-acquired the collection lock.
- idx = desc->infoObj();
+ Collection* collection = db->getCollection(ns);
+ if (!collection) {
+ // Collection was dropped.
+ return true;
+ }
- if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) {
- error() << "special index can't be used as a ttl index, skipping ttl job for: "
- << idx;
- return true;
- }
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
+ // We've stepped down since we started this function, so we should stop working
+ // as we only do deletes on the primary.
+ return false;
+ }
- BSONElement secondsExpireElt = idx[secondsExpireField];
- if (!secondsExpireElt.isNumber()) {
- error() << "ttl indexes require the " << secondsExpireField << " field to be "
- << "numeric but received a type of " << typeName(secondsExpireElt.type())
- << ", skipping ttl job for: " << idx;
- return true;
- }
+ IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn, key);
+ if (!desc) {
+ LOG(1) << "index not found (index build in progress? index dropped?), skipping "
+ << "ttl job for: " << idx;
+ return true;
+ }
- const Date_t kDawnOfTime =
- Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min());
- const BSONObj startKey = BSON("" << kDawnOfTime);
- const BSONObj endKey = BSON("" << now - Seconds(secondsExpireElt.numberLong()));
- const bool endKeyInclusive = true;
- // The canonical check as to whether a key pattern element is "ascending" or
- // "descending" is (elt.number() >= 0). This is defined by the Ordering class.
- const InternalPlanner::Direction direction = (key.firstElement().number() >= 0)
- ? InternalPlanner::Direction::FORWARD
- : InternalPlanner::Direction::BACKWARD;
- unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(
- txn, collection, desc, startKey, endKey, endKeyInclusive, direction));
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ // Re-read 'idx' from the descriptor, in case the collection or index definition
+ // changed before we re-acquired the collection lock.
+ idx = desc->infoObj();
- try {
- PlanExecutor::ExecState state;
- BSONObj obj;
- RecordId rid;
- while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, &rid))) {
- exec->saveState();
- {
- WriteUnitOfWork wunit(txn);
- collection->deleteDocument(txn, rid);
- wunit.commit();
- }
- ++numDeleted;
- ttlDeletedDocuments.increment();
- if (!exec->restoreState()) {
- return true;
- }
- }
+ if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) {
+ error() << "special index can't be used as a ttl index, skipping ttl job for: " << idx;
+ return true;
+ }
- if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
- if (WorkingSetCommon::isValidStatusMemberObject(obj)) {
- error() << "ttl query execution for index " << idx
- << " failed with: " << WorkingSetCommon::getMemberObjectStatus(obj);
- return true;
- }
- error() << "ttl query execution for index " << idx
- << " failed with state: " << PlanExecutor::statestr(state);
- return true;
- }
+ BSONElement secondsExpireElt = idx[secondsExpireField];
+ if (!secondsExpireElt.isNumber()) {
+ error() << "ttl indexes require the " << secondsExpireField << " field to be "
+ << "numeric but received a type of " << typeName(secondsExpireElt.type())
+ << ", skipping ttl job for: " << idx;
+ return true;
+ }
- invariant(PlanExecutor::IS_EOF == state);
- break;
- } catch (const WriteConflictException& dle) {
- WriteConflictException::logAndBackoff(attempt++, "ttl", ns);
- }
+ const Date_t kDawnOfTime =
+ Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min());
+ const Date_t expirationTime = Date_t::now() - Seconds(secondsExpireElt.numberLong());
+ const BSONObj startKey = BSON("" << kDawnOfTime);
+ const BSONObj endKey = BSON("" << expirationTime);
+ const bool endKeyInclusive = true;
+ // The canonical check as to whether a key pattern element is "ascending" or
+ // "descending" is (elt.number() >= 0). This is defined by the Ordering class.
+ const InternalPlanner::Direction direction = (key.firstElement().number() >= 0)
+ ? InternalPlanner::Direction::FORWARD
+ : InternalPlanner::Direction::BACKWARD;
+
+ // We need to pass into the DeleteStageParams (below) a CanonicalQuery with a BSONObj that
+ // queries for the expired documents correctly so that we do not delete documents that are
+ // not actually expired when our snapshot changes during deletion.
+ const char* keyFieldName = key.firstElement().fieldName();
+ BSONObj query =
+ BSON(keyFieldName << BSON("$gte" << kDawnOfTime << "$lte" << expirationTime));
+ auto canonicalQuery = CanonicalQuery::canonicalize(nss, query);
+ invariantOK(canonicalQuery.getStatus());
+
+ DeleteStageParams params;
+ params.isMulti = true;
+ params.canonicalQuery = canonicalQuery.getValue().get();
+
+ unique_ptr<PlanExecutor> exec =
+ InternalPlanner::deleteWithIndexScan(txn,
+ collection,
+ params,
+ desc,
+ startKey,
+ endKey,
+ endKeyInclusive,
+ PlanExecutor::YIELD_AUTO,
+ direction);
+
+ Status result = exec->executePlan();
+ if (!result.isOK()) {
+ error() << "ttl query execution for index " << idx << " failed with status: " << result;
+ return true;
}
+ const long long numDeleted = DeleteStage::getNumDeleted(*exec);
+ ttlDeletedDocuments.increment(numDeleted);
LOG(1) << "\tTTL deleted: " << numDeleted << endl;
+
return true;
}
};