/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/db/range_deleter.h" #include #include #include "mongo/s/range_arithmetic.h" #include "mongo/db/range_deleter_stats.h" #include "mongo/util/concurrency/synchronization.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/time_support.h" using std::auto_ptr; using std::set; using std::pair; using std::string; using mongoutils::str::stream; namespace { const long int NotEmptyTimeoutMillis = 200; const long long int MaxCurorCheckIntervalMillis = 500; /** * Removes an element from the container that holds a pointer type, and deletes the * pointer as well. Returns true if the element was found. */ template bool deletePtrElement(ContainerType* container, ContainerElementType elem) { typename ContainerType::iterator iter = container->find(elem); if (iter == container->end()) { return false; } delete *iter; container->erase(iter); return true; } } namespace mongo { namespace duration = boost::posix_time; struct RangeDeleter::RangeDeleteEntry { RangeDeleteEntry(): secondaryThrottle(true), notifyDone(NULL), transactionFactory(OperationContext::factoryNULL) { // XXX SERVER-13931 } std::string ns; // Inclusive lower range. BSONObj min; // Exclusive upper range. BSONObj max; // The key pattern of the index the range refers to. // This is relevant especially with special indexes types // like hash indexes. BSONObj shardKeyPattern; bool secondaryThrottle; // Sets of cursors to wait to close until this can be ready // for deletion. std::set cursorsToWait; // Not owned here. // Important invariant: Can only be set and used by one thread. Notification* notifyDone; OperationContext::Factory transactionFactory; // For debugging only BSONObj toBSON() const { return BSON("ns" << ns << "min" << min << "max" << max << "notifyDoneAddr" << reinterpret_cast(notifyDone)); } }; struct RangeDeleter::NSMinMax { NSMinMax(std::string ns, const BSONObj min, const BSONObj max): ns(ns), min(min), max(max) { } std::string ns; // Inclusive lower range. BSONObj min; // Exclusive upper range. BSONObj max; }; bool RangeDeleter::NSMinMaxCmp::operator()( const NSMinMax* lhs, const NSMinMax* rhs) const { const int nsComp = lhs->ns.compare(rhs->ns); if (nsComp < 0) { return true; } if (nsComp > 0) { return false; } return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0; } RangeDeleter::RangeDeleter(RangeDeleterEnv* env): _env(env), // ownership xfer _stopMutex("stopRangeDeleter"), _stopRequested(false), _queueMutex("RangeDeleter"), _stats(new RangeDeleterStats(&_queueMutex)) { } RangeDeleter::~RangeDeleter() { for(TaskList::iterator it = _notReadyQueue.begin(); it != _notReadyQueue.end(); ++it) { delete (*it); } for(TaskList::iterator it = _taskQueue.begin(); it != _taskQueue.end(); ++it) { delete (*it); } for(NSMinMaxSet::iterator it = _deleteSet.begin(); it != _deleteSet.end(); ++it) { delete (*it); } for(NSMinMaxSet::iterator it = _blackList.begin(); it != _blackList.end(); ++it) { delete (*it); } } void RangeDeleter::startWorkers() { if (!_worker) { _worker.reset(new boost::thread(stdx::bind(&RangeDeleter::doWork, this))); } } void RangeDeleter::stopWorkers() { { scoped_lock sl(_stopMutex); _stopRequested = true; } if (_worker) { _worker->join(); } scoped_lock sl(_queueMutex); while (_stats->hasInProgress_inlock()) { _nothingInProgressCV.wait(sl.boost()); } } bool RangeDeleter::queueDelete(OperationContext::Factory transactionFactory, const std::string& ns, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, bool secondaryThrottle, Notification* notifyDone, std::string* errMsg) { string dummy; if (errMsg == NULL) errMsg = &dummy; auto_ptr toDelete(new RangeDeleteEntry); toDelete->transactionFactory = transactionFactory; toDelete->ns = ns; toDelete->min = min.getOwned(); toDelete->max = max.getOwned(); toDelete->shardKeyPattern = shardKeyPattern.getOwned(); toDelete->secondaryThrottle = secondaryThrottle; toDelete->notifyDone = notifyDone; { scoped_lock sl(_queueMutex); if (_stopRequested) { *errMsg = "deleter is already stopped."; return false; } if (!canEnqueue_inlock(ns, min, max, errMsg)) { return false; } _deleteSet.insert(new NSMinMax(ns, min, max)); _stats->incTotalDeletes_inlock(); _stats->incPendingDeletes_inlock(); } _env->getCursorIds(ns, &toDelete->cursorsToWait); { scoped_lock sl(_queueMutex); if (toDelete->cursorsToWait.empty()) { _taskQueue.push_back(toDelete.release()); _taskQueueNotEmptyCV.notify_one(); } else { log() << "rangeDeleter waiting for " << toDelete->cursorsToWait.size() << " cursors in " << ns << " to finish" << endl; _notReadyQueue.push_back(toDelete.release()); } } return true; } bool RangeDeleter::deleteNow(OperationContext* txn, const std::string& ns, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, bool secondaryThrottle, string* errMsg) { if (stopRequested()) { *errMsg = "deleter is already stopped."; return false; } string dummy; if (errMsg == NULL) errMsg = &dummy; NSMinMax deleteRange(ns, min, max); { scoped_lock sl(_queueMutex); if (!canEnqueue_inlock(ns, min, max, errMsg)) { return false; } _deleteSet.insert(&deleteRange); _stats->incTotalDeletes_inlock(); // Note: count for pending deletes is an integral part of the shutdown story. // Therefore, to simplify things, there is no "pending" state for deletes in // deleteNow, the state transition is simply inProgress -> done. _stats->incInProgressDeletes_inlock(); } set cursorsToWait; _env->getCursorIds(ns, &cursorsToWait); long long checkIntervalMillis = 5; if (!cursorsToWait.empty()) { log() << "rangeDeleter waiting for " << cursorsToWait.size() << " cursors in " << ns << " to finish" << endl; } while (!cursorsToWait.empty()) { set cursorsNow; _env->getCursorIds(ns, &cursorsNow); set cursorsLeft; std::set_intersection(cursorsToWait.begin(), cursorsToWait.end(), cursorsNow.begin(), cursorsNow.end(), std::inserter(cursorsLeft, cursorsLeft.end())); cursorsToWait.swap(cursorsLeft); if (stopRequested()) { *errMsg = "deleter was stopped."; scoped_lock sl(_queueMutex); _deleteSet.erase(&deleteRange); _stats->decInProgressDeletes_inlock(); _stats->decTotalDeletes_inlock(); if (!_stats->hasInProgress_inlock()) { _nothingInProgressCV.notify_one(); } return false; } if (checkIntervalMillis < MaxCurorCheckIntervalMillis) { checkIntervalMillis *= 2; } sleepmillis(checkIntervalMillis); } bool result = _env->deleteRange(txn, ns, min, max, shardKeyPattern, secondaryThrottle, errMsg); { scoped_lock sl(_queueMutex); _deleteSet.erase(&deleteRange); _stats->decInProgressDeletes_inlock(); _stats->decTotalDeletes_inlock(); if (!_stats->hasInProgress_inlock()) { _nothingInProgressCV.notify_one(); } } return result; } bool RangeDeleter::addToBlackList(const StringData& ns, const BSONObj& min, const BSONObj& max, std::string* errMsg) { string dummy; if (errMsg == NULL) errMsg = &dummy; scoped_lock sl(_queueMutex); if (isBlacklisted_inlock(ns, min, max, errMsg)) { return false; } for (NSMinMaxSet::const_iterator iter = _deleteSet.begin(); iter != _deleteSet.end(); ++iter) { const NSMinMax* const entry = *iter; if (entry->ns == ns && rangeOverlaps(entry->min, entry->max, min, max)) { *errMsg = stream() << "Cannot black list ns: " << ns << ", min: " << min << ", max: " << max << " since it is already queued for deletion."; return false; } } _blackList.insert(new NSMinMax(ns.toString(), min, max)); return true; } bool RangeDeleter::removeFromBlackList(const StringData& ns, const BSONObj& min, const BSONObj& max) { scoped_lock sl(_queueMutex); NSMinMax entry(ns.toString(), min, max); return deletePtrElement(&_blackList, &entry); } const RangeDeleterStats* RangeDeleter::getStats() const { return _stats.get(); } BSONObj RangeDeleter::toBSON() const { scoped_lock sl(_queueMutex); BSONObjBuilder builder; BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady")); for (TaskList::const_iterator iter = _notReadyQueue.begin(); iter != _notReadyQueue.end(); ++iter) { notReadyBuilder.append((*iter)->toBSON()); } notReadyBuilder.doneFast(); BSONArrayBuilder readyBuilder(builder.subarrayStart("ready")); for (TaskList::const_iterator iter = _taskQueue.begin(); iter != _taskQueue.end(); ++iter) { readyBuilder.append((*iter)->toBSON()); } readyBuilder.doneFast(); return builder.obj(); } void RangeDeleter::doWork() { _env->initThread(); while (!inShutdown() && !stopRequested()) { string errMsg; RangeDeleteEntry* nextTask = NULL; { scoped_lock sl(_queueMutex); while (_taskQueue.empty()) { _taskQueueNotEmptyCV.timed_wait( sl.boost(), duration::milliseconds(NotEmptyTimeoutMillis)); if (stopRequested()) { log() << "stopping range deleter worker" << endl; return; } if (_taskQueue.empty()) { // Try to check if some deletes are ready and move them to the // ready queue. TaskList::iterator iter = _notReadyQueue.begin(); while (iter != _notReadyQueue.end()) { RangeDeleteEntry* entry = *iter; set cursorsNow; _env->getCursorIds(entry->ns, &cursorsNow); set cursorsLeft; std::set_intersection(entry->cursorsToWait.begin(), entry->cursorsToWait.end(), cursorsNow.begin(), cursorsNow.end(), std::inserter(cursorsLeft, cursorsLeft.end())); entry->cursorsToWait.swap(cursorsLeft); if (entry->cursorsToWait.empty()) { _taskQueue.push_back(*iter); _taskQueueNotEmptyCV.notify_one(); iter = _notReadyQueue.erase(iter); } else { ++iter; } } } } if (stopRequested()) { log() << "stopping range deleter worker" << endl; return; } nextTask = _taskQueue.front(); _taskQueue.pop_front(); _stats->decPendingDeletes_inlock(); _stats->incInProgressDeletes_inlock(); } { boost::scoped_ptr txn(nextTask->transactionFactory()); // XXX SERVER-13931 if (!_env->deleteRange(txn.get(), nextTask->ns, nextTask->min, nextTask->max, nextTask->shardKeyPattern, nextTask->secondaryThrottle, &errMsg)) { warning() << "Error encountered while trying to delete range: " << errMsg << endl; } } { scoped_lock sl(_queueMutex); NSMinMax setEntry(nextTask->ns, nextTask->min, nextTask->max); deletePtrElement(&_deleteSet, &setEntry); _stats->decInProgressDeletes_inlock(); _stats->decTotalDeletes_inlock(); if (nextTask->notifyDone) { nextTask->notifyDone->notifyOne(); } delete nextTask; nextTask = NULL; } } } bool RangeDeleter::isBlacklisted_inlock(const StringData& ns, const BSONObj& min, const BSONObj& max, std::string* errMsg) const { for (NSMinMaxSet::const_iterator iter = _blackList.begin(); iter != _blackList.end(); ++iter) { const NSMinMax* const entry = *iter; if (ns != entry->ns) continue; if (rangeOverlaps(min, max, entry->min, entry->max)) { *errMsg = stream() << "ns: " << ns << ", min: " << min << ", max: " << max << " intersects with black list" << " min: " << entry->min << ", max: " << entry->max; return true; } } return false; } bool RangeDeleter::canEnqueue_inlock(const StringData& ns, const BSONObj& min, const BSONObj& max, string* errMsg) const { if (isBlacklisted_inlock(ns, min, max, errMsg)) { return false; } NSMinMax toDelete(ns.toString(), min, max); if (_deleteSet.count(&toDelete) > 0) { *errMsg = stream() << "ns: " << ns << ", min: " << min << ", max: " << max << " is already being processed for deletion."; return false; } return true; } bool RangeDeleter::stopRequested() const { scoped_lock sl(_stopMutex); return _stopRequested; } }