/** * Copyright (C) 2013 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/db/range_deleter.h" #include #include #include "mongo/db/global_environment_experiment.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/write_concern_options.h" #include "mongo/util/concurrency/synchronization.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" using std::auto_ptr; using std::set; using std::pair; using std::string; namespace { const long int kNotEmptyTimeoutMillis = 200; const long long int kMaxCursorCheckIntervalMillis = 500; const size_t kDeleteJobsHistory = 10; // entries /** * Removes an element from the container that holds a pointer type, and deletes the * pointer as well. Returns true if the element was found. */ template bool deletePtrElement(ContainerType* container, ContainerElementType elem) { typename ContainerType::iterator iter = container->find(elem); if (iter == container->end()) { return false; } delete *iter; container->erase(iter); return true; } } namespace mongo { namespace duration = boost::posix_time; static void logCursorsWaiting(RangeDeleteEntry* entry) { // We always log the first cursors waiting message (so we have cursor ids in the logs). // After 15 minutes (the cursor timeout period), we start logging additional messages at // a 1 minute interval. static const long long kLogCursorsThresholdMillis = 15 * 60 * 1000; static const long long kLogCursorsIntervalMillis = 1 * 60 * 1000; Date_t currentTime = jsTime(); long long elapsedMillisSinceQueued = 0; // We always log the first message when lastLoggedTime == 0 if (entry->lastLoggedTS != 0) { if (currentTime > entry->stats.queueStartTS) elapsedMillisSinceQueued = currentTime - entry->stats.queueStartTS; // Not logging, threshold not passed if (elapsedMillisSinceQueued < kLogCursorsThresholdMillis) return; long long elapsedMillisSinceLog = 0; if (currentTime > entry->lastLoggedTS) elapsedMillisSinceLog = currentTime - entry->lastLoggedTS; // Not logging, logged a short time ago if (elapsedMillisSinceLog < kLogCursorsIntervalMillis) return; } str::stream cursorList; for (std::set::const_iterator it = entry->cursorsToWait.begin(); it != entry->cursorsToWait.end(); ++it) { if (it != entry->cursorsToWait.begin()) cursorList << ", "; cursorList << *it; } log() << "waiting for open cursors before removing range " << "[" << entry->options.range.minKey << ", " << entry->options.range.maxKey << ") " << "in " << entry->options.range.ns << (entry->lastLoggedTS == 0 ? string("") : string(str::stream() << ", elapsed secs: " << elapsedMillisSinceQueued / 1000)) << ", cursor ids: [" << string(cursorList) << "]"; entry->lastLoggedTS = currentTime; } struct RangeDeleter::NSMinMax { NSMinMax(std::string ns, const BSONObj min, const BSONObj max): ns(ns), min(min), max(max) { } std::string ns; // Inclusive lower range. BSONObj min; // Exclusive upper range. BSONObj max; }; bool RangeDeleter::NSMinMaxCmp::operator()( const NSMinMax* lhs, const NSMinMax* rhs) const { const int nsComp = lhs->ns.compare(rhs->ns); if (nsComp < 0) { return true; } if (nsComp > 0) { return false; } return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0; } RangeDeleter::RangeDeleter(RangeDeleterEnv* env): _env(env), // ownership xfer _stopMutex("stopRangeDeleter"), _stopRequested(false), _queueMutex("RangeDeleter"), _deletesInProgress(0), _statsHistoryMutex("RangeDeleterStatsHistory") { } RangeDeleter::~RangeDeleter() { for(TaskList::iterator it = _notReadyQueue.begin(); it != _notReadyQueue.end(); ++it) { delete (*it); } for(TaskList::iterator it = _taskQueue.begin(); it != _taskQueue.end(); ++it) { delete (*it); } for(NSMinMaxSet::iterator it = _deleteSet.begin(); it != _deleteSet.end(); ++it) { delete (*it); } for(std::deque::iterator it = _statsHistory.begin(); it != _statsHistory.end(); ++it) { delete (*it); } } void RangeDeleter::startWorkers() { if (!_worker) { _worker.reset(new boost::thread(stdx::bind(&RangeDeleter::doWork, this))); } } void RangeDeleter::stopWorkers() { { scoped_lock sl(_stopMutex); _stopRequested = true; } if (_worker) { _worker->join(); } scoped_lock sl(_queueMutex); while (_deletesInProgress > 0) { _nothingInProgressCV.wait(sl.boost()); } } bool RangeDeleter::queueDelete(OperationContext* txn, const RangeDeleterOptions& options, Notification* notifyDone, std::string* errMsg) { string dummy; if (errMsg == NULL) errMsg = &dummy; const string& ns(options.range.ns); const BSONObj& min(options.range.minKey); const BSONObj& max(options.range.maxKey); auto_ptr toDelete( new RangeDeleteEntry(options)); toDelete->notifyDone = notifyDone; { scoped_lock sl(_queueMutex); if (_stopRequested) { *errMsg = "deleter is already stopped."; return false; } if (!canEnqueue_inlock(ns, min, max, errMsg)) { return false; } _deleteSet.insert(new NSMinMax(ns, min.getOwned(), max.getOwned())); } if (options.waitForOpenCursors) { _env->getCursorIds(txn, ns, &toDelete->cursorsToWait); } toDelete->stats.queueStartTS = jsTime(); if (!toDelete->cursorsToWait.empty()) logCursorsWaiting(toDelete.get()); { scoped_lock sl(_queueMutex); if (toDelete->cursorsToWait.empty()) { toDelete->stats.queueEndTS = jsTime(); _taskQueue.push_back(toDelete.release()); _taskQueueNotEmptyCV.notify_one(); } else { _notReadyQueue.push_back(toDelete.release()); } } return true; } namespace { const int kWTimeoutMillis = 60 * 60 * 1000; bool _waitForMajority(OperationContext* txn, std::string* errMsg) { const WriteConcernOptions writeConcern("majority", WriteConcernOptions::NONE, kWTimeoutMillis); repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOpForClient( txn, writeConcern); repl::ReplicationCoordinator::Milliseconds elapsedTime = replStatus.duration; if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { *errMsg = str::stream() << "rangeDeleter timed out after " << elapsedTime.seconds() << " seconds while waiting" << " for deletions to be replicated to majority nodes"; log() << *errMsg; } else if (replStatus.status.code() == ErrorCodes::NotMaster) { *errMsg = str::stream() << "rangeDeleter no longer PRIMARY after " << elapsedTime.seconds() << " seconds while waiting" << " for deletions to be replicated to majority nodes"; } else { LOG(elapsedTime.seconds() < 30 ? 1 : 0) << "rangeDeleter took " << elapsedTime.seconds() << " seconds " << " waiting for deletes to be replicated to majority nodes"; fassert(18512, replStatus.status); } return replStatus.status.isOK(); } } bool RangeDeleter::deleteNow(OperationContext* txn, const RangeDeleterOptions& options, string* errMsg) { if (stopRequested()) { *errMsg = "deleter is already stopped."; return false; } string dummy; if (errMsg == NULL) errMsg = &dummy; const string& ns(options.range.ns); const BSONObj& min(options.range.minKey); const BSONObj& max(options.range.maxKey); NSMinMax deleteRange(ns, min, max); { scoped_lock sl(_queueMutex); if (!canEnqueue_inlock(ns, min, max, errMsg)) { return false; } _deleteSet.insert(&deleteRange); // Note: count for pending deletes is an integral part of the shutdown story. // Therefore, to simplify things, there is no "pending" state for deletes in // deleteNow, the state transition is simply inProgress -> done. _deletesInProgress++; } set cursorsToWait; if (options.waitForOpenCursors) { _env->getCursorIds(txn, ns, &cursorsToWait); } long long checkIntervalMillis = 5; RangeDeleteEntry taskDetails(options); taskDetails.stats.queueStartTS = jsTime(); for (; !cursorsToWait.empty(); sleepmillis(checkIntervalMillis)) { logCursorsWaiting(&taskDetails); set cursorsNow; _env->getCursorIds(txn, ns, &cursorsNow); set cursorsLeft; std::set_intersection(cursorsToWait.begin(), cursorsToWait.end(), cursorsNow.begin(), cursorsNow.end(), std::inserter(cursorsLeft, cursorsLeft.end())); cursorsToWait.swap(cursorsLeft); if (stopRequested()) { *errMsg = "deleter was stopped."; scoped_lock sl(_queueMutex); _deleteSet.erase(&deleteRange); _deletesInProgress--; if (_deletesInProgress == 0) { _nothingInProgressCV.notify_one(); } return false; } if (checkIntervalMillis < kMaxCursorCheckIntervalMillis) { checkIntervalMillis *= 2; } } taskDetails.stats.queueEndTS = jsTime(); taskDetails.stats.deleteStartTS = jsTime(); bool result = _env->deleteRange(txn, taskDetails, &taskDetails.stats.deletedDocCount, errMsg); taskDetails.stats.deleteEndTS = jsTime(); if (result) { taskDetails.stats.waitForReplStartTS = jsTime(); result = _waitForMajority(txn, errMsg); taskDetails.stats.waitForReplEndTS = jsTime(); } { scoped_lock sl(_queueMutex); _deleteSet.erase(&deleteRange); _deletesInProgress--; if (_deletesInProgress == 0) { _nothingInProgressCV.notify_one(); } } recordDelStats(new DeleteJobStats(taskDetails.stats)); return result; } void RangeDeleter::getStatsHistory(std::vector* stats) const { stats->clear(); stats->reserve(kDeleteJobsHistory); scoped_lock sl(_statsHistoryMutex); for (deque::const_iterator it = _statsHistory.begin(); it != _statsHistory.end(); ++it) { stats->push_back(new DeleteJobStats(**it)); } } BSONObj RangeDeleter::toBSON() const { scoped_lock sl(_queueMutex); BSONObjBuilder builder; BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady")); for (TaskList::const_iterator iter = _notReadyQueue.begin(); iter != _notReadyQueue.end(); ++iter) { notReadyBuilder.append((*iter)->toBSON()); } notReadyBuilder.doneFast(); BSONArrayBuilder readyBuilder(builder.subarrayStart("ready")); for (TaskList::const_iterator iter = _taskQueue.begin(); iter != _taskQueue.end(); ++iter) { readyBuilder.append((*iter)->toBSON()); } readyBuilder.doneFast(); return builder.obj(); } void RangeDeleter::doWork() { _env->initThread(); while (!inShutdown() && !stopRequested()) { string errMsg; boost::scoped_ptr txn(getGlobalEnvironment()->newOpCtx()); RangeDeleteEntry* nextTask = NULL; { scoped_lock sl(_queueMutex); while (_taskQueue.empty()) { _taskQueueNotEmptyCV.timed_wait( sl.boost(), duration::milliseconds(kNotEmptyTimeoutMillis)); if (stopRequested()) { log() << "stopping range deleter worker" << endl; return; } if (_taskQueue.empty()) { // Try to check if some deletes are ready and move them to the // ready queue. TaskList::iterator iter = _notReadyQueue.begin(); while (iter != _notReadyQueue.end()) { RangeDeleteEntry* entry = *iter; set cursorsNow; { if (entry->options.waitForOpenCursors) { _env->getCursorIds(txn.get(), entry->options.range.ns, &cursorsNow); } } set cursorsLeft; std::set_intersection(entry->cursorsToWait.begin(), entry->cursorsToWait.end(), cursorsNow.begin(), cursorsNow.end(), std::inserter(cursorsLeft, cursorsLeft.end())); entry->cursorsToWait.swap(cursorsLeft); if (entry->cursorsToWait.empty()) { (*iter)->stats.queueEndTS = jsTime(); _taskQueue.push_back(*iter); _taskQueueNotEmptyCV.notify_one(); iter = _notReadyQueue.erase(iter); } else { logCursorsWaiting(entry); ++iter; } } } } if (stopRequested()) { log() << "stopping range deleter worker" << endl; return; } nextTask = _taskQueue.front(); _taskQueue.pop_front(); _deletesInProgress++; } { nextTask->stats.deleteStartTS = jsTime(); bool delResult = _env->deleteRange(txn.get(), *nextTask, &nextTask->stats.deletedDocCount, &errMsg); nextTask->stats.deleteEndTS = jsTime(); if (delResult) { nextTask->stats.waitForReplStartTS = jsTime(); if (!_waitForMajority(txn.get(), &errMsg)) { warning() << "Error encountered while waiting for replication: " << errMsg; } nextTask->stats.waitForReplEndTS = jsTime(); } else { warning() << "Error encountered while trying to delete range: " << errMsg << endl; } } { scoped_lock sl(_queueMutex); NSMinMax setEntry(nextTask->options.range.ns, nextTask->options.range.minKey, nextTask->options.range.maxKey); deletePtrElement(&_deleteSet, &setEntry); _deletesInProgress--; if (nextTask->notifyDone) { nextTask->notifyDone->notifyOne(); } } recordDelStats(new DeleteJobStats(nextTask->stats)); delete nextTask; nextTask = NULL; } } bool RangeDeleter::canEnqueue_inlock(const StringData& ns, const BSONObj& min, const BSONObj& max, string* errMsg) const { NSMinMax toDelete(ns.toString(), min, max); if (_deleteSet.count(&toDelete) > 0) { *errMsg = str::stream() << "ns: " << ns << ", min: " << min << ", max: " << max << " is already being processed for deletion."; return false; } return true; } bool RangeDeleter::stopRequested() const { scoped_lock sl(_stopMutex); return _stopRequested; } size_t RangeDeleter::getTotalDeletes() const { scoped_lock sl(_queueMutex); return _deleteSet.size(); } size_t RangeDeleter::getPendingDeletes() const { scoped_lock sl(_queueMutex); return _notReadyQueue.size() + _taskQueue.size(); } size_t RangeDeleter::getDeletesInProgress() const { scoped_lock sl(_queueMutex); return _deletesInProgress; } void RangeDeleter::recordDelStats(DeleteJobStats* newStat) { scoped_lock sl(_statsHistoryMutex); if (_statsHistory.size() == kDeleteJobsHistory) { delete _statsHistory.front(); _statsHistory.pop_front(); } _statsHistory.push_back(newStat); } RangeDeleteEntry::RangeDeleteEntry(const RangeDeleterOptions& options) : options(options), notifyDone(NULL), lastLoggedTS(0) { } BSONObj RangeDeleteEntry::toBSON() const { BSONObjBuilder builder; builder.append("ns", options.range.ns); builder.append("min", options.range.minKey); builder.append("max", options.range.maxKey); BSONArrayBuilder cursorBuilder(builder.subarrayStart("cursors")); for (std::set::const_iterator it = cursorsToWait.begin(); it != cursorsToWait.end(); ++it) { cursorBuilder.append((long long)*it); } cursorBuilder.doneFast(); return builder.done().copy(); } RangeDeleterOptions::RangeDeleterOptions(const KeyRange& range) : range(range), fromMigrate(false), onlyRemoveOrphanedDocs(false), waitForOpenCursors(false) { } }