/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/range_deleter.h" #include #include #include "mongo/db/global_environment_experiment.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/write_concern_options.h" #include "mongo/s/range_arithmetic.h" #include "mongo/util/concurrency/synchronization.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" using std::auto_ptr; using std::set; using std::pair; using std::string; using mongoutils::str::stream; namespace { const long int NotEmptyTimeoutMillis = 200; const long long int MaxCurorCheckIntervalMillis = 500; const unsigned long long LogCursorsThresholdMillis = 60 * 1000; const unsigned long long LogCursorsIntervalMillis = 10 * 1000; const size_t DeleteJobsHistory = 10; // entries /** * Removes an element from the container that holds a pointer type, and deletes the * pointer as well. Returns true if the element was found. */ template bool deletePtrElement(ContainerType* container, ContainerElementType elem) { typename ContainerType::iterator iter = container->find(elem); if (iter == container->end()) { return false; } delete *iter; container->erase(iter); return true; } void logCursorsWaiting(const std::string& ns, const mongo::BSONObj& min, const mongo::BSONObj& max, unsigned long long int elapsedMS, const std::set& cursorsToWait) { mongo::StringBuilder cursorList; for (std::set::const_iterator it = cursorsToWait.begin(); it != cursorsToWait.end(); ++it) { cursorList << *it << " "; } mongo::log() << "Waiting for open cursors before deleting ns: " << ns << ", min: " << min << ", max: " << max << ", elapsedMS: " << elapsedMS << ", cursors: [ " << cursorList.str() << "]" << std::endl; } } namespace mongo { namespace duration = boost::posix_time; struct RangeDeleter::NSMinMax { NSMinMax(std::string ns, const BSONObj min, const BSONObj max): ns(ns), min(min), max(max) { } std::string ns; // Inclusive lower range. BSONObj min; // Exclusive upper range. BSONObj max; }; bool RangeDeleter::NSMinMaxCmp::operator()( const NSMinMax* lhs, const NSMinMax* rhs) const { const int nsComp = lhs->ns.compare(rhs->ns); if (nsComp < 0) { return true; } if (nsComp > 0) { return false; } return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0; } RangeDeleter::RangeDeleter(RangeDeleterEnv* env): _env(env), // ownership xfer _stopMutex("stopRangeDeleter"), _stopRequested(false), _queueMutex("RangeDeleter"), _deletesInProgress(0), _statsHistoryMutex("RangeDeleterStatsHistory") { } RangeDeleter::~RangeDeleter() { for(TaskList::iterator it = _notReadyQueue.begin(); it != _notReadyQueue.end(); ++it) { delete (*it); } for(TaskList::iterator it = _taskQueue.begin(); it != _taskQueue.end(); ++it) { delete (*it); } for(NSMinMaxSet::iterator it = _deleteSet.begin(); it != _deleteSet.end(); ++it) { delete (*it); } for(NSMinMaxSet::iterator it = _blackList.begin(); it != _blackList.end(); ++it) { delete (*it); } } void RangeDeleter::startWorkers() { if (!_worker) { _worker.reset(new boost::thread(stdx::bind(&RangeDeleter::doWork, this))); } } void RangeDeleter::stopWorkers() { { scoped_lock sl(_stopMutex); _stopRequested = true; } if (_worker) { _worker->join(); } scoped_lock sl(_queueMutex); while (_deletesInProgress > 0) { _nothingInProgressCV.wait(sl.boost()); } } bool RangeDeleter::queueDelete(const std::string& ns, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, bool secondaryThrottle, Notification* notifyDone, std::string* errMsg) { string dummy; if (errMsg == NULL) errMsg = &dummy; auto_ptr toDelete(new RangeDeleteEntry(ns, min.getOwned(), max.getOwned(), shardKeyPattern.getOwned(), secondaryThrottle)); toDelete->notifyDone = notifyDone; { scoped_lock sl(_queueMutex); if (_stopRequested) { *errMsg = "deleter is already stopped."; return false; } if (!canEnqueue_inlock(ns, min, max, errMsg)) { return false; } _deleteSet.insert(new NSMinMax(ns, min, max)); } { boost::scoped_ptr txn(getGlobalEnvironment()->newOpCtx()); _env->getCursorIds(txn.get(), ns, &toDelete->cursorsToWait); } toDelete->stats.queueStartTS = jsTime(); { scoped_lock sl(_queueMutex); if (toDelete->cursorsToWait.empty()) { toDelete->stats.queueEndTS = jsTime(); _taskQueue.push_back(toDelete.release()); _taskQueueNotEmptyCV.notify_one(); } else { log() << "rangeDeleter waiting for " << toDelete->cursorsToWait.size() << " cursors in " << ns << " to finish" << endl; _notReadyQueue.push_back(toDelete.release()); } } return true; } namespace { bool _waitForReplication(OperationContext* txn, std::string* errMsg) { WriteConcernOptions writeConcern; writeConcern.wMode = "majority"; writeConcern.wTimeout = 60 * 60 * 1000; repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOp(txn, writeConcern); repl::ReplicationCoordinator::Milliseconds elapsedTime = replStatus.duration; if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { *errMsg = str::stream() << "rangeDeleter timed out after " << elapsedTime.seconds() << " seconds while waiting" << " for deletions to be replicated to majority nodes"; log() << *errMsg; } else if (replStatus.status.code() == ErrorCodes::NotMaster) { *errMsg = str::stream() << "rangeDeleter no longer PRIMARY after " << elapsedTime.seconds() << " seconds while waiting" << " for deletions to be replicated to majority nodes"; } else { LOG(elapsedTime.seconds() < 30 ? 1 : 0) << "rangeDeleter took " << elapsedTime.seconds() << " seconds " << " waiting for deletes to be replicated to majority nodes"; fassert(18512, replStatus.status); } return replStatus.status.isOK(); } } bool RangeDeleter::deleteNow(OperationContext* txn, const std::string& ns, const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern, bool secondaryThrottle, string* errMsg) { if (stopRequested()) { *errMsg = "deleter is already stopped."; return false; } string dummy; if (errMsg == NULL) errMsg = &dummy; NSMinMax deleteRange(ns, min, max); { scoped_lock sl(_queueMutex); if (!canEnqueue_inlock(ns, min, max, errMsg)) { return false; } _deleteSet.insert(&deleteRange); // Note: count for pending deletes is an integral part of the shutdown story. // Therefore, to simplify things, there is no "pending" state for deletes in // deleteNow, the state transition is simply inProgress -> done. _deletesInProgress++; } set cursorsToWait; _env->getCursorIds(txn, ns, &cursorsToWait); long long checkIntervalMillis = 5; if (!cursorsToWait.empty()) { log() << "rangeDeleter waiting for " << cursorsToWait.size() << " cursors in " << ns << " to finish" << endl; } RangeDeleteEntry taskDetails(ns, min, max, shardKeyPattern, secondaryThrottle); taskDetails.stats.queueStartTS = jsTime(); Date_t timeSinceLastLog; for (; !cursorsToWait.empty(); sleepmillis(checkIntervalMillis)) { const unsigned long long timeNow = curTimeMillis64(); const unsigned long long elapsedTimeMillis = timeNow - taskDetails.stats.queueStartTS.millis; const unsigned long long lastLogMillis = timeNow - timeSinceLastLog.millis; if (elapsedTimeMillis > LogCursorsThresholdMillis && lastLogMillis > LogCursorsIntervalMillis) { timeSinceLastLog = jsTime(); logCursorsWaiting(ns, min, max, elapsedTimeMillis, cursorsToWait); } set cursorsNow; _env->getCursorIds(txn, ns, &cursorsNow); set cursorsLeft; std::set_intersection(cursorsToWait.begin(), cursorsToWait.end(), cursorsNow.begin(), cursorsNow.end(), std::inserter(cursorsLeft, cursorsLeft.end())); cursorsToWait.swap(cursorsLeft); if (stopRequested()) { *errMsg = "deleter was stopped."; scoped_lock sl(_queueMutex); _deleteSet.erase(&deleteRange); _deletesInProgress--; if (_deletesInProgress == 0) { _nothingInProgressCV.notify_one(); } return false; } if (checkIntervalMillis < MaxCurorCheckIntervalMillis) { checkIntervalMillis *= 2; } } taskDetails.stats.queueEndTS = jsTime(); taskDetails.stats.deleteStartTS = jsTime(); bool result = _env->deleteRange(txn, taskDetails, &taskDetails.stats.deletedDocCount, errMsg); taskDetails.stats.deleteEndTS = jsTime(); if (result) { taskDetails.stats.waitForReplStartTS = jsTime(); result = _waitForReplication(txn, errMsg); taskDetails.stats.waitForReplEndTS = jsTime(); } { scoped_lock sl(_queueMutex); _deleteSet.erase(&deleteRange); _deletesInProgress--; if (_deletesInProgress == 0) { _nothingInProgressCV.notify_one(); } } recordDelStats(new DeleteJobStats(taskDetails.stats)); return result; } bool RangeDeleter::addToBlackList(const StringData& ns, const BSONObj& min, const BSONObj& max, std::string* errMsg) { string dummy; if (errMsg == NULL) errMsg = &dummy; scoped_lock sl(_queueMutex); if (isBlacklisted_inlock(ns, min, max, errMsg)) { return false; } for (NSMinMaxSet::const_iterator iter = _deleteSet.begin(); iter != _deleteSet.end(); ++iter) { const NSMinMax* const entry = *iter; if (entry->ns == ns && rangeOverlaps(entry->min, entry->max, min, max)) { *errMsg = stream() << "Cannot black list ns: " << ns << ", min: " << min << ", max: " << max << " since it is already queued for deletion."; return false; } } _blackList.insert(new NSMinMax(ns.toString(), min, max)); return true; } bool RangeDeleter::removeFromBlackList(const StringData& ns, const BSONObj& min, const BSONObj& max) { scoped_lock sl(_queueMutex); NSMinMax entry(ns.toString(), min, max); return deletePtrElement(&_blackList, &entry); } void RangeDeleter::getStatsHistory(std::vector* stats) const { stats->clear(); stats->reserve(DeleteJobsHistory); scoped_lock sl(_statsHistoryMutex); for (deque::const_iterator it = _statsHistory.begin(); it != _statsHistory.end(); ++it) { stats->push_back(new DeleteJobStats(**it)); } } BSONObj RangeDeleter::toBSON() const { scoped_lock sl(_queueMutex); BSONObjBuilder builder; BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady")); for (TaskList::const_iterator iter = _notReadyQueue.begin(); iter != _notReadyQueue.end(); ++iter) { notReadyBuilder.append((*iter)->toBSON()); } notReadyBuilder.doneFast(); BSONArrayBuilder readyBuilder(builder.subarrayStart("ready")); for (TaskList::const_iterator iter = _taskQueue.begin(); iter != _taskQueue.end(); ++iter) { readyBuilder.append((*iter)->toBSON()); } readyBuilder.doneFast(); return builder.obj(); } void RangeDeleter::doWork() { _env->initThread(); while (!inShutdown() && !stopRequested()) { string errMsg; RangeDeleteEntry* nextTask = NULL; { scoped_lock sl(_queueMutex); while (_taskQueue.empty()) { _taskQueueNotEmptyCV.timed_wait( sl.boost(), duration::milliseconds(NotEmptyTimeoutMillis)); if (stopRequested()) { log() << "stopping range deleter worker" << endl; return; } if (_taskQueue.empty()) { // Try to check if some deletes are ready and move them to the // ready queue. TaskList::iterator iter = _notReadyQueue.begin(); while (iter != _notReadyQueue.end()) { RangeDeleteEntry* entry = *iter; set cursorsNow; { boost::scoped_ptr txn(getGlobalEnvironment()->newOpCtx()); _env->getCursorIds(txn.get(), entry->ns, &cursorsNow); } set cursorsLeft; std::set_intersection(entry->cursorsToWait.begin(), entry->cursorsToWait.end(), cursorsNow.begin(), cursorsNow.end(), std::inserter(cursorsLeft, cursorsLeft.end())); entry->cursorsToWait.swap(cursorsLeft); if (entry->cursorsToWait.empty()) { (*iter)->stats.queueEndTS = jsTime(); _taskQueue.push_back(*iter); _taskQueueNotEmptyCV.notify_one(); iter = _notReadyQueue.erase(iter); } else { const unsigned long long int elapsedMillis = entry->stats.queueStartTS.millis - curTimeMillis64(); if ( elapsedMillis > LogCursorsThresholdMillis && entry->timeSinceLastLog.millis > LogCursorsIntervalMillis) { entry->timeSinceLastLog = jsTime(); logCursorsWaiting(entry->ns, entry->min, entry->max, elapsedMillis, entry->cursorsToWait); } ++iter; } } } } if (stopRequested()) { log() << "stopping range deleter worker" << endl; return; } nextTask = _taskQueue.front(); _taskQueue.pop_front(); _deletesInProgress++; } { boost::scoped_ptr txn(getGlobalEnvironment()->newOpCtx()); nextTask->stats.deleteStartTS = jsTime(); bool delResult = _env->deleteRange(txn.get(), *nextTask, &nextTask->stats.deletedDocCount, &errMsg); nextTask->stats.deleteEndTS = jsTime(); if (delResult) { nextTask->stats.waitForReplStartTS = jsTime(); if (!_waitForReplication(txn.get(), &errMsg)) { warning() << "Error encountered while waiting for replication: " << errMsg; } nextTask->stats.waitForReplEndTS = jsTime(); } else { warning() << "Error encountered while trying to delete range: " << errMsg << endl; } } { scoped_lock sl(_queueMutex); NSMinMax setEntry(nextTask->ns, nextTask->min, nextTask->max); deletePtrElement(&_deleteSet, &setEntry); _deletesInProgress--; if (nextTask->notifyDone) { nextTask->notifyDone->notifyOne(); } } recordDelStats(new DeleteJobStats(nextTask->stats)); delete nextTask; nextTask = NULL; } } bool RangeDeleter::isBlacklisted_inlock(const StringData& ns, const BSONObj& min, const BSONObj& max, std::string* errMsg) const { for (NSMinMaxSet::const_iterator iter = _blackList.begin(); iter != _blackList.end(); ++iter) { const NSMinMax* const entry = *iter; if (ns != entry->ns) continue; if (rangeOverlaps(min, max, entry->min, entry->max)) { *errMsg = stream() << "ns: " << ns << ", min: " << min << ", max: " << max << " intersects with black list" << " min: " << entry->min << ", max: " << entry->max; return true; } } return false; } bool RangeDeleter::canEnqueue_inlock(const StringData& ns, const BSONObj& min, const BSONObj& max, string* errMsg) const { if (isBlacklisted_inlock(ns, min, max, errMsg)) { return false; } NSMinMax toDelete(ns.toString(), min, max); if (_deleteSet.count(&toDelete) > 0) { *errMsg = stream() << "ns: " << ns << ", min: " << min << ", max: " << max << " is already being processed for deletion."; return false; } return true; } bool RangeDeleter::stopRequested() const { scoped_lock sl(_stopMutex); return _stopRequested; } size_t RangeDeleter::getTotalDeletes() const { scoped_lock sl(_queueMutex); return _deleteSet.size(); } size_t RangeDeleter::getPendingDeletes() const { scoped_lock sl(_queueMutex); return _notReadyQueue.size() + _taskQueue.size(); } size_t RangeDeleter::getDeletesInProgress() const { scoped_lock sl(_queueMutex); return _deletesInProgress; } void RangeDeleter::recordDelStats(DeleteJobStats* newStat) { scoped_lock sl(_statsHistoryMutex); if (_statsHistory.size() == DeleteJobsHistory) { delete _statsHistory.front(); _statsHistory.pop_front(); } _statsHistory.push_back(newStat); } RangeDeleteEntry::RangeDeleteEntry(const std::string& ns, const BSONObj& min, const BSONObj& max, const BSONObj& shardKey, bool secondaryThrottle): ns(ns), min(min), max(max), shardKeyPattern(shardKey), secondaryThrottle(secondaryThrottle), notifyDone(NULL) { } BSONObj RangeDeleteEntry::toBSON() const { BSONObjBuilder builder; builder.append("ns", ns); builder.append("min", min); builder.append("max", max); BSONArrayBuilder cursorBuilder(builder.subarrayStart("cursors")); for (std::set::const_iterator it = cursorsToWait.begin(); it != cursorsToWait.end(); ++it) { cursorBuilder.append((long long)*it); } cursorBuilder.doneFast(); return builder.done().copy(); } }