diff options
author | Qingyang Chen <qingyang.chen@10gen.com> | 2015-08-06 15:20:19 -0400 |
---|---|---|
committer | Qingyang Chen <qingyang.chen@10gen.com> | 2015-08-13 14:18:55 -0400 |
commit | 7c808909a8c0a746ffd2d8153c90a77be8b85204 (patch) | |
tree | 988b67fed5c71f6906a6e7852688576a456fdf6d /src/mongo/db/ttl.cpp | |
parent | 9eadefd1f5e9f9a480f14fe5ad0b1b1838e96005 (diff) | |
download | mongo-7c808909a8c0a746ffd2d8153c90a77be8b85204.tar.gz |
SERVER-19466 TTLMonitor::doTTLForIndex() use IXSCAN => FETCH => DELETE
Diffstat (limited to 'src/mongo/db/ttl.cpp')
-rw-r--r-- | src/mongo/db/ttl.cpp | 175 |
1 files changed, 81 insertions, 94 deletions
diff --git a/src/mongo/db/ttl.cpp b/src/mongo/db/ttl.cpp index c05e886468d..9e5652d2e42 100644 --- a/src/mongo/db/ttl.cpp +++ b/src/mongo/db/ttl.cpp @@ -46,7 +46,7 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" -#include "mongo/db/exec/working_set_common.h" +#include "mongo/db/exec/delete.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/ops/insert.h" @@ -224,110 +224,97 @@ private: LOG(1) << "TTL -- ns: " << ns << " key: " << key; - // Read the current time outside of the while loop, so that we don't expand our index - // bounds after every WriteConflictException. - const Date_t now = Date_t::now(); - - long long numDeleted = 0; - int attempt = 1; - while (1) { - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetDb autoDb(txn, dbName, MODE_IX); - Database* db = autoDb.getDb(); - if (!db) { - return false; - } - - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX); - - Collection* collection = db->getCollection(ns); - if (!collection) { - // Collection was dropped. - return true; - } + ScopedTransaction scopedXact(txn, MODE_IX); + AutoGetDb autoDb(txn, dbName, MODE_IX); + Database* db = autoDb.getDb(); + if (!db) { + return false; + } - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - // We've stepped down since we started this function, so we should stop working - // as we only do deletes on the primary. - return false; - } + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_IX); - IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn, key); - if (!desc) { - LOG(1) << "index not found (index build in progress? index dropped?), skipping " - << "ttl job for: " << idx; - return true; - } - - // Re-read 'idx' from the descriptor, in case the collection or index definition - // changed before we re-acquired the collection lock. - idx = desc->infoObj(); + Collection* collection = db->getCollection(ns); + if (!collection) { + // Collection was dropped. + return true; + } - if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) { - error() << "special index can't be used as a ttl index, skipping ttl job for: " - << idx; - return true; - } + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { + // We've stepped down since we started this function, so we should stop working + // as we only do deletes on the primary. + return false; + } - BSONElement secondsExpireElt = idx[secondsExpireField]; - if (!secondsExpireElt.isNumber()) { - error() << "ttl indexes require the " << secondsExpireField << " field to be " - << "numeric but received a type of " << typeName(secondsExpireElt.type()) - << ", skipping ttl job for: " << idx; - return true; - } + IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern(txn, key); + if (!desc) { + LOG(1) << "index not found (index build in progress? index dropped?), skipping " + << "ttl job for: " << idx; + return true; + } - const Date_t kDawnOfTime = - Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min()); - const BSONObj startKey = BSON("" << kDawnOfTime); - const BSONObj endKey = BSON("" << now - Seconds(secondsExpireElt.numberLong())); - const bool endKeyInclusive = true; - // The canonical check as to whether a key pattern element is "ascending" or - // "descending" is (elt.number() >= 0). This is defined by the Ordering class. - const InternalPlanner::Direction direction = (key.firstElement().number() >= 0) - ? InternalPlanner::Direction::FORWARD - : InternalPlanner::Direction::BACKWARD; - unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan( - txn, collection, desc, startKey, endKey, endKeyInclusive, direction)); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + // Re-read 'idx' from the descriptor, in case the collection or index definition + // changed before we re-acquired the collection lock. + idx = desc->infoObj(); - try { - PlanExecutor::ExecState state; - BSONObj obj; - RecordId rid; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, &rid))) { - exec->saveState(); - { - WriteUnitOfWork wunit(txn); - collection->deleteDocument(txn, rid); - wunit.commit(); - } - ++numDeleted; - ttlDeletedDocuments.increment(); - if (!exec->restoreState()) { - return true; - } - } + if (IndexType::INDEX_BTREE != IndexNames::nameToType(desc->getAccessMethodName())) { + error() << "special index can't be used as a ttl index, skipping ttl job for: " << idx; + return true; + } - if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { - if (WorkingSetCommon::isValidStatusMemberObject(obj)) { - error() << "ttl query execution for index " << idx - << " failed with: " << WorkingSetCommon::getMemberObjectStatus(obj); - return true; - } - error() << "ttl query execution for index " << idx - << " failed with state: " << PlanExecutor::statestr(state); - return true; - } + BSONElement secondsExpireElt = idx[secondsExpireField]; + if (!secondsExpireElt.isNumber()) { + error() << "ttl indexes require the " << secondsExpireField << " field to be " + << "numeric but received a type of " << typeName(secondsExpireElt.type()) + << ", skipping ttl job for: " << idx; + return true; + } - invariant(PlanExecutor::IS_EOF == state); - break; - } catch (const WriteConflictException& dle) { - WriteConflictException::logAndBackoff(attempt++, "ttl", ns); - } + const Date_t kDawnOfTime = + Date_t::fromMillisSinceEpoch(std::numeric_limits<long long>::min()); + const Date_t expirationTime = Date_t::now() - Seconds(secondsExpireElt.numberLong()); + const BSONObj startKey = BSON("" << kDawnOfTime); + const BSONObj endKey = BSON("" << expirationTime); + const bool endKeyInclusive = true; + // The canonical check as to whether a key pattern element is "ascending" or + // "descending" is (elt.number() >= 0). This is defined by the Ordering class. + const InternalPlanner::Direction direction = (key.firstElement().number() >= 0) + ? InternalPlanner::Direction::FORWARD + : InternalPlanner::Direction::BACKWARD; + + // We need to pass into the DeleteStageParams (below) a CanonicalQuery with a BSONObj that + // queries for the expired documents correctly so that we do not delete documents that are + // not actually expired when our snapshot changes during deletion. + const char* keyFieldName = key.firstElement().fieldName(); + BSONObj query = + BSON(keyFieldName << BSON("$gte" << kDawnOfTime << "$lte" << expirationTime)); + auto canonicalQuery = CanonicalQuery::canonicalize(nss, query); + invariantOK(canonicalQuery.getStatus()); + + DeleteStageParams params; + params.isMulti = true; + params.canonicalQuery = canonicalQuery.getValue().get(); + + unique_ptr<PlanExecutor> exec = + InternalPlanner::deleteWithIndexScan(txn, + collection, + params, + desc, + startKey, + endKey, + endKeyInclusive, + PlanExecutor::YIELD_AUTO, + direction); + + Status result = exec->executePlan(); + if (!result.isOK()) { + error() << "ttl query execution for index " << idx << " failed with status: " << result; + return true; } + const long long numDeleted = DeleteStage::getNumDeleted(*exec); + ttlDeletedDocuments.increment(numDeleted); LOG(1) << "\tTTL deleted: " << numDeleted << endl; + return true; } }; |