diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/range_deleter.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/range_deleter.cpp')
-rw-r--r-- | src/mongo/db/range_deleter.cpp | 842 |
1 files changed, 404 insertions, 438 deletions
diff --git a/src/mongo/db/range_deleter.cpp b/src/mongo/db/range_deleter.cpp index 771aa9074f7..036f8e7c4d4 100644 --- a/src/mongo/db/range_deleter.cpp +++ b/src/mongo/db/range_deleter.cpp @@ -53,349 +53,300 @@ using std::pair; using std::string; namespace { - const long int kNotEmptyTimeoutMillis = 200; - const long long int kMaxCursorCheckIntervalMillis = 500; - const size_t kDeleteJobsHistory = 10; // entries - - /** - * Removes an element from the container that holds a pointer type, and deletes the - * pointer as well. Returns true if the element was found. - */ - template <typename ContainerType, typename ContainerElementType> - bool deletePtrElement(ContainerType* container, ContainerElementType elem) { - typename ContainerType::iterator iter = container->find(elem); - - if (iter == container->end()) { - return false; - } +const long int kNotEmptyTimeoutMillis = 200; +const long long int kMaxCursorCheckIntervalMillis = 500; +const size_t kDeleteJobsHistory = 10; // entries - delete *iter; - container->erase(iter); - return true; +/** + * Removes an element from the container that holds a pointer type, and deletes the + * pointer as well. Returns true if the element was found. + */ +template <typename ContainerType, typename ContainerElementType> +bool deletePtrElement(ContainerType* container, ContainerElementType elem) { + typename ContainerType::iterator iter = container->find(elem); + + if (iter == container->end()) { + return false; } + + delete *iter; + container->erase(iter); + return true; +} } namespace mongo { - namespace duration = boost::posix_time; - - static void logCursorsWaiting(RangeDeleteEntry* entry) { +namespace duration = boost::posix_time; - // We always log the first cursors waiting message (so we have cursor ids in the logs). - // After 15 minutes (the cursor timeout period), we start logging additional messages at - // a 1 minute interval. - static const auto kLogCursorsThreshold = stdx::chrono::minutes{15}; - static const auto kLogCursorsInterval = stdx::chrono::minutes{1}; +static void logCursorsWaiting(RangeDeleteEntry* entry) { + // We always log the first cursors waiting message (so we have cursor ids in the logs). + // After 15 minutes (the cursor timeout period), we start logging additional messages at + // a 1 minute interval. + static const auto kLogCursorsThreshold = stdx::chrono::minutes{15}; + static const auto kLogCursorsInterval = stdx::chrono::minutes{1}; - Date_t currentTime = jsTime(); - Milliseconds elapsedMillisSinceQueued{0}; + Date_t currentTime = jsTime(); + Milliseconds elapsedMillisSinceQueued{0}; - // We always log the first message when lastLoggedTime == 0 - if (entry->lastLoggedTS != Date_t()) { + // We always log the first message when lastLoggedTime == 0 + if (entry->lastLoggedTS != Date_t()) { + if (currentTime > entry->stats.queueStartTS) + elapsedMillisSinceQueued = currentTime - entry->stats.queueStartTS; - if (currentTime > entry->stats.queueStartTS) - elapsedMillisSinceQueued = currentTime - entry->stats.queueStartTS; - - // Not logging, threshold not passed - if (elapsedMillisSinceQueued < kLogCursorsThreshold) - return; + // Not logging, threshold not passed + if (elapsedMillisSinceQueued < kLogCursorsThreshold) + return; - Milliseconds elapsedMillisSinceLog{0}; - if (currentTime > entry->lastLoggedTS) - elapsedMillisSinceLog = currentTime - entry->lastLoggedTS; + Milliseconds elapsedMillisSinceLog{0}; + if (currentTime > entry->lastLoggedTS) + elapsedMillisSinceLog = currentTime - entry->lastLoggedTS; - // Not logging, logged a short time ago - if (elapsedMillisSinceLog < kLogCursorsInterval) - return; - } - - str::stream cursorList; - for (std::set<CursorId>::const_iterator it = entry->cursorsToWait.begin(); - it != entry->cursorsToWait.end(); ++it) { - if (it != entry->cursorsToWait.begin()) - cursorList << ", "; - cursorList << *it; - } - - log() << "waiting for open cursors before removing range " - << "[" << entry->options.range.minKey << ", " << entry->options.range.maxKey << ") " - << "in " << entry->options.range.ns - << (entry->lastLoggedTS == Date_t() ? - string("") : - string(str::stream() << ", elapsed secs: " << - durationCount<Seconds>(elapsedMillisSinceQueued))) - << ", cursor ids: [" << string(cursorList) << "]"; + // Not logging, logged a short time ago + if (elapsedMillisSinceLog < kLogCursorsInterval) + return; + } - entry->lastLoggedTS = currentTime; + str::stream cursorList; + for (std::set<CursorId>::const_iterator it = entry->cursorsToWait.begin(); + it != entry->cursorsToWait.end(); + ++it) { + if (it != entry->cursorsToWait.begin()) + cursorList << ", "; + cursorList << *it; } - struct RangeDeleter::NSMinMax { - NSMinMax(std::string ns, const BSONObj min, const BSONObj max): - ns(ns), min(min), max(max) { - } + log() << "waiting for open cursors before removing range " + << "[" << entry->options.range.minKey << ", " << entry->options.range.maxKey << ") " + << "in " << entry->options.range.ns + << (entry->lastLoggedTS == Date_t() + ? string("") + : string(str::stream() << ", elapsed secs: " + << durationCount<Seconds>(elapsedMillisSinceQueued))) + << ", cursor ids: [" << string(cursorList) << "]"; - std::string ns; + entry->lastLoggedTS = currentTime; +} - // Inclusive lower range. - BSONObj min; +struct RangeDeleter::NSMinMax { + NSMinMax(std::string ns, const BSONObj min, const BSONObj max) : ns(ns), min(min), max(max) {} - // Exclusive upper range. - BSONObj max; - }; + std::string ns; - bool RangeDeleter::NSMinMaxCmp::operator()( - const NSMinMax* lhs, const NSMinMax* rhs) const { - const int nsComp = lhs->ns.compare(rhs->ns); + // Inclusive lower range. + BSONObj min; - if (nsComp < 0) { - return true; - } + // Exclusive upper range. + BSONObj max; +}; - if (nsComp > 0) { - return false; - } +bool RangeDeleter::NSMinMaxCmp::operator()(const NSMinMax* lhs, const NSMinMax* rhs) const { + const int nsComp = lhs->ns.compare(rhs->ns); - return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0; + if (nsComp < 0) { + return true; } - RangeDeleter::RangeDeleter(RangeDeleterEnv* env): - _env(env), // ownership xfer - _stopRequested(false), - _deletesInProgress(0) { + if (nsComp > 0) { + return false; } - RangeDeleter::~RangeDeleter() { - for(TaskList::iterator it = _notReadyQueue.begin(); - it != _notReadyQueue.end(); - ++it) { - delete (*it); - } - - for(TaskList::iterator it = _taskQueue.begin(); - it != _taskQueue.end(); - ++it) { - delete (*it); - } - - for(NSMinMaxSet::iterator it = _deleteSet.begin(); - it != _deleteSet.end(); - ++it) { - delete (*it); - } + return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0; +} - for(std::deque<DeleteJobStats*>::iterator it = _statsHistory.begin(); - it != _statsHistory.end(); - ++it) { - delete (*it); - } +RangeDeleter::RangeDeleter(RangeDeleterEnv* env) + : _env(env), // ownership xfer + _stopRequested(false), + _deletesInProgress(0) {} +RangeDeleter::~RangeDeleter() { + for (TaskList::iterator it = _notReadyQueue.begin(); it != _notReadyQueue.end(); ++it) { + delete (*it); } - void RangeDeleter::startWorkers() { - if (!_worker) { - _worker.reset(new stdx::thread(stdx::bind(&RangeDeleter::doWork, this))); - } + for (TaskList::iterator it = _taskQueue.begin(); it != _taskQueue.end(); ++it) { + delete (*it); } - void RangeDeleter::stopWorkers() { - { - stdx::lock_guard<stdx::mutex> sl(_stopMutex); - _stopRequested = true; - } + for (NSMinMaxSet::iterator it = _deleteSet.begin(); it != _deleteSet.end(); ++it) { + delete (*it); + } - if (_worker) { - _worker->join(); - } + for (std::deque<DeleteJobStats*>::iterator it = _statsHistory.begin(); + it != _statsHistory.end(); + ++it) { + delete (*it); + } +} - stdx::unique_lock<stdx::mutex> sl(_queueMutex); - while (_deletesInProgress > 0) { - _nothingInProgressCV.wait(sl); - } +void RangeDeleter::startWorkers() { + if (!_worker) { + _worker.reset(new stdx::thread(stdx::bind(&RangeDeleter::doWork, this))); } +} - bool RangeDeleter::queueDelete(OperationContext* txn, - const RangeDeleterOptions& options, - Notification* notifyDone, - std::string* errMsg) { - string dummy; - if (errMsg == NULL) errMsg = &dummy; +void RangeDeleter::stopWorkers() { + { + stdx::lock_guard<stdx::mutex> sl(_stopMutex); + _stopRequested = true; + } - const string& ns(options.range.ns); - const BSONObj& min(options.range.minKey); - const BSONObj& max(options.range.maxKey); + if (_worker) { + _worker->join(); + } - unique_ptr<RangeDeleteEntry> toDelete( - new RangeDeleteEntry(options)); - toDelete->notifyDone = notifyDone; + stdx::unique_lock<stdx::mutex> sl(_queueMutex); + while (_deletesInProgress > 0) { + _nothingInProgressCV.wait(sl); + } +} - { - stdx::lock_guard<stdx::mutex> sl(_queueMutex); - if (_stopRequested) { - *errMsg = "deleter is already stopped."; - return false; - } +bool RangeDeleter::queueDelete(OperationContext* txn, + const RangeDeleterOptions& options, + Notification* notifyDone, + std::string* errMsg) { + string dummy; + if (errMsg == NULL) + errMsg = &dummy; - if (!canEnqueue_inlock(ns, min, max, errMsg)) { - return false; - } + const string& ns(options.range.ns); + const BSONObj& min(options.range.minKey); + const BSONObj& max(options.range.maxKey); - _deleteSet.insert(new NSMinMax(ns, min.getOwned(), max.getOwned())); - } + unique_ptr<RangeDeleteEntry> toDelete(new RangeDeleteEntry(options)); + toDelete->notifyDone = notifyDone; - if (options.waitForOpenCursors) { - _env->getCursorIds(txn, ns, &toDelete->cursorsToWait); + { + stdx::lock_guard<stdx::mutex> sl(_queueMutex); + if (_stopRequested) { + *errMsg = "deleter is already stopped."; + return false; } - toDelete->stats.queueStartTS = jsTime(); - - if (!toDelete->cursorsToWait.empty()) - logCursorsWaiting(toDelete.get()); - - { - stdx::lock_guard<stdx::mutex> sl(_queueMutex); - - if (toDelete->cursorsToWait.empty()) { - toDelete->stats.queueEndTS = jsTime(); - _taskQueue.push_back(toDelete.release()); - _taskQueueNotEmptyCV.notify_one(); - } - else { - _notReadyQueue.push_back(toDelete.release()); - } + if (!canEnqueue_inlock(ns, min, max, errMsg)) { + return false; } - return true; + _deleteSet.insert(new NSMinMax(ns, min.getOwned(), max.getOwned())); } -namespace { - const int kWTimeoutMillis = 60 * 60 * 1000; - - bool _waitForMajority(OperationContext* txn, std::string* errMsg) { - const WriteConcernOptions writeConcern(WriteConcernOptions::kMajority, - WriteConcernOptions::NONE, - kWTimeoutMillis); - - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOpForClient( - txn, writeConcern); - Milliseconds elapsedTime = replStatus.duration; - if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { - *errMsg = str::stream() << "rangeDeleter timed out after " - << durationCount<Seconds>(elapsedTime) - << " seconds while waiting" - " for deletions to be replicated to majority nodes"; - log() << *errMsg; - } - else if (replStatus.status.code() == ErrorCodes::NotMaster) { - *errMsg = str::stream() << "rangeDeleter no longer PRIMARY after " - << durationCount<Seconds>(elapsedTime) - << " seconds while waiting" - " for deletions to be replicated to majority nodes"; - } - else { - LOG(elapsedTime < Seconds(30) ? 1 : 0) - << "rangeDeleter took " << durationCount<Seconds>(elapsedTime) << " seconds " - << " waiting for deletes to be replicated to majority nodes"; - - fassert(18512, replStatus.status); - } - - return replStatus.status.isOK(); + if (options.waitForOpenCursors) { + _env->getCursorIds(txn, ns, &toDelete->cursorsToWait); } -} - - bool RangeDeleter::deleteNow(OperationContext* txn, - const RangeDeleterOptions& options, - string* errMsg) { - if (stopRequested()) { - *errMsg = "deleter is already stopped."; - return false; - } - string dummy; - if (errMsg == NULL) errMsg = &dummy; + toDelete->stats.queueStartTS = jsTime(); - const string& ns(options.range.ns); - const BSONObj& min(options.range.minKey); - const BSONObj& max(options.range.maxKey); + if (!toDelete->cursorsToWait.empty()) + logCursorsWaiting(toDelete.get()); - NSMinMax deleteRange(ns, min, max); - { - stdx::lock_guard<stdx::mutex> sl(_queueMutex); - if (!canEnqueue_inlock(ns, min, max, errMsg)) { - return false; - } - - _deleteSet.insert(&deleteRange); + { + stdx::lock_guard<stdx::mutex> sl(_queueMutex); - // Note: count for pending deletes is an integral part of the shutdown story. - // Therefore, to simplify things, there is no "pending" state for deletes in - // deleteNow, the state transition is simply inProgress -> done. - _deletesInProgress++; + if (toDelete->cursorsToWait.empty()) { + toDelete->stats.queueEndTS = jsTime(); + _taskQueue.push_back(toDelete.release()); + _taskQueueNotEmptyCV.notify_one(); + } else { + _notReadyQueue.push_back(toDelete.release()); } + } - set<CursorId> cursorsToWait; - if (options.waitForOpenCursors) { - _env->getCursorIds(txn, ns, &cursorsToWait); - } + return true; +} - long long checkIntervalMillis = 5; +namespace { +const int kWTimeoutMillis = 60 * 60 * 1000; + +bool _waitForMajority(OperationContext* txn, std::string* errMsg) { + const WriteConcernOptions writeConcern( + WriteConcernOptions::kMajority, WriteConcernOptions::NONE, kWTimeoutMillis); + + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOpForClient(txn, + writeConcern); + Milliseconds elapsedTime = replStatus.duration; + if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { + *errMsg = str::stream() << "rangeDeleter timed out after " + << durationCount<Seconds>(elapsedTime) + << " seconds while waiting" + " for deletions to be replicated to majority nodes"; + log() << *errMsg; + } else if (replStatus.status.code() == ErrorCodes::NotMaster) { + *errMsg = str::stream() << "rangeDeleter no longer PRIMARY after " + << durationCount<Seconds>(elapsedTime) + << " seconds while waiting" + " for deletions to be replicated to majority nodes"; + } else { + LOG(elapsedTime < Seconds(30) ? 1 : 0) + << "rangeDeleter took " << durationCount<Seconds>(elapsedTime) << " seconds " + << " waiting for deletes to be replicated to majority nodes"; + + fassert(18512, replStatus.status); + } - RangeDeleteEntry taskDetails(options); - taskDetails.stats.queueStartTS = jsTime(); + return replStatus.status.isOK(); +} +} - for (; !cursorsToWait.empty(); sleepmillis(checkIntervalMillis)) { +bool RangeDeleter::deleteNow(OperationContext* txn, + const RangeDeleterOptions& options, + string* errMsg) { + if (stopRequested()) { + *errMsg = "deleter is already stopped."; + return false; + } - logCursorsWaiting(&taskDetails); + string dummy; + if (errMsg == NULL) + errMsg = &dummy; - set<CursorId> cursorsNow; - _env->getCursorIds(txn, ns, &cursorsNow); + const string& ns(options.range.ns); + const BSONObj& min(options.range.minKey); + const BSONObj& max(options.range.maxKey); - set<CursorId> cursorsLeft; - std::set_intersection(cursorsToWait.begin(), - cursorsToWait.end(), - cursorsNow.begin(), - cursorsNow.end(), - std::inserter(cursorsLeft, cursorsLeft.end())); + NSMinMax deleteRange(ns, min, max); + { + stdx::lock_guard<stdx::mutex> sl(_queueMutex); + if (!canEnqueue_inlock(ns, min, max, errMsg)) { + return false; + } - cursorsToWait.swap(cursorsLeft); + _deleteSet.insert(&deleteRange); - if (stopRequested()) { - *errMsg = "deleter was stopped."; + // Note: count for pending deletes is an integral part of the shutdown story. + // Therefore, to simplify things, there is no "pending" state for deletes in + // deleteNow, the state transition is simply inProgress -> done. + _deletesInProgress++; + } - stdx::lock_guard<stdx::mutex> sl(_queueMutex); - _deleteSet.erase(&deleteRange); + set<CursorId> cursorsToWait; + if (options.waitForOpenCursors) { + _env->getCursorIds(txn, ns, &cursorsToWait); + } - _deletesInProgress--; + long long checkIntervalMillis = 5; - if (_deletesInProgress == 0) { - _nothingInProgressCV.notify_one(); - } + RangeDeleteEntry taskDetails(options); + taskDetails.stats.queueStartTS = jsTime(); - return false; - } + for (; !cursorsToWait.empty(); sleepmillis(checkIntervalMillis)) { + logCursorsWaiting(&taskDetails); - if (checkIntervalMillis < kMaxCursorCheckIntervalMillis) { - checkIntervalMillis *= 2; - } - } - taskDetails.stats.queueEndTS = jsTime(); + set<CursorId> cursorsNow; + _env->getCursorIds(txn, ns, &cursorsNow); - taskDetails.stats.deleteStartTS = jsTime(); - bool result = _env->deleteRange(txn, - taskDetails, - &taskDetails.stats.deletedDocCount, - errMsg); + set<CursorId> cursorsLeft; + std::set_intersection(cursorsToWait.begin(), + cursorsToWait.end(), + cursorsNow.begin(), + cursorsNow.end(), + std::inserter(cursorsLeft, cursorsLeft.end())); - taskDetails.stats.deleteEndTS = jsTime(); + cursorsToWait.swap(cursorsLeft); - if (result) { - taskDetails.stats.waitForReplStartTS = jsTime(); - result = _waitForMajority(txn, errMsg); - taskDetails.stats.waitForReplEndTS = jsTime(); - } + if (stopRequested()) { + *errMsg = "deleter was stopped."; - { stdx::lock_guard<stdx::mutex> sl(_queueMutex); _deleteSet.erase(&deleteRange); @@ -404,229 +355,244 @@ namespace { if (_deletesInProgress == 0) { _nothingInProgressCV.notify_one(); } + + return false; } - recordDelStats(new DeleteJobStats(taskDetails.stats)); - return result; + if (checkIntervalMillis < kMaxCursorCheckIntervalMillis) { + checkIntervalMillis *= 2; + } } + taskDetails.stats.queueEndTS = jsTime(); - void RangeDeleter::getStatsHistory(std::vector<DeleteJobStats*>* stats) const { - stats->clear(); - stats->reserve(kDeleteJobsHistory); + taskDetails.stats.deleteStartTS = jsTime(); + bool result = _env->deleteRange(txn, taskDetails, &taskDetails.stats.deletedDocCount, errMsg); - stdx::lock_guard<stdx::mutex> sl(_statsHistoryMutex); - for (std::deque<DeleteJobStats*>::const_iterator it = _statsHistory.begin(); - it != _statsHistory.end(); ++it) { - stats->push_back(new DeleteJobStats(**it)); - } + taskDetails.stats.deleteEndTS = jsTime(); + + if (result) { + taskDetails.stats.waitForReplStartTS = jsTime(); + result = _waitForMajority(txn, errMsg); + taskDetails.stats.waitForReplEndTS = jsTime(); } - BSONObj RangeDeleter::toBSON() const { + { stdx::lock_guard<stdx::mutex> sl(_queueMutex); + _deleteSet.erase(&deleteRange); - BSONObjBuilder builder; + _deletesInProgress--; - BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady")); - for (TaskList::const_iterator iter = _notReadyQueue.begin(); - iter != _notReadyQueue.end(); ++iter) { - notReadyBuilder.append((*iter)->toBSON()); + if (_deletesInProgress == 0) { + _nothingInProgressCV.notify_one(); } - notReadyBuilder.doneFast(); + } - BSONArrayBuilder readyBuilder(builder.subarrayStart("ready")); - for (TaskList::const_iterator iter = _taskQueue.begin(); - iter != _taskQueue.end(); ++iter) { - readyBuilder.append((*iter)->toBSON()); - } - readyBuilder.doneFast(); + recordDelStats(new DeleteJobStats(taskDetails.stats)); + return result; +} + +void RangeDeleter::getStatsHistory(std::vector<DeleteJobStats*>* stats) const { + stats->clear(); + stats->reserve(kDeleteJobsHistory); - return builder.obj(); + stdx::lock_guard<stdx::mutex> sl(_statsHistoryMutex); + for (std::deque<DeleteJobStats*>::const_iterator it = _statsHistory.begin(); + it != _statsHistory.end(); + ++it) { + stats->push_back(new DeleteJobStats(**it)); } +} - void RangeDeleter::doWork() { - Client::initThreadIfNotAlready("RangeDeleter"); - Client* client = &cc(); +BSONObj RangeDeleter::toBSON() const { + stdx::lock_guard<stdx::mutex> sl(_queueMutex); - while (!inShutdown() && !stopRequested()) { - string errMsg; + BSONObjBuilder builder; - RangeDeleteEntry* nextTask = NULL; + BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady")); + for (TaskList::const_iterator iter = _notReadyQueue.begin(); iter != _notReadyQueue.end(); + ++iter) { + notReadyBuilder.append((*iter)->toBSON()); + } + notReadyBuilder.doneFast(); - { - stdx::unique_lock<stdx::mutex> sl(_queueMutex); - while (_taskQueue.empty()) { - _taskQueueNotEmptyCV.timed_wait( - sl, duration::milliseconds(kNotEmptyTimeoutMillis)); + BSONArrayBuilder readyBuilder(builder.subarrayStart("ready")); + for (TaskList::const_iterator iter = _taskQueue.begin(); iter != _taskQueue.end(); ++iter) { + readyBuilder.append((*iter)->toBSON()); + } + readyBuilder.doneFast(); - if (stopRequested()) { - log() << "stopping range deleter worker" << endl; - return; - } + return builder.obj(); +} - if (_taskQueue.empty()) { - // Try to check if some deletes are ready and move them to the - // ready queue. - - TaskList::iterator iter = _notReadyQueue.begin(); - while (iter != _notReadyQueue.end()) { - RangeDeleteEntry* entry = *iter; - - set<CursorId> cursorsNow; - if (entry->options.waitForOpenCursors) { - auto txn = client->makeOperationContext(); - _env->getCursorIds(txn.get(), - entry->options.range.ns, - &cursorsNow); - } - - set<CursorId> cursorsLeft; - std::set_intersection(entry->cursorsToWait.begin(), - entry->cursorsToWait.end(), - cursorsNow.begin(), - cursorsNow.end(), - std::inserter(cursorsLeft, - cursorsLeft.end())); - - entry->cursorsToWait.swap(cursorsLeft); - - if (entry->cursorsToWait.empty()) { - (*iter)->stats.queueEndTS = jsTime(); - _taskQueue.push_back(*iter); - _taskQueueNotEmptyCV.notify_one(); - iter = _notReadyQueue.erase(iter); - } - else { - logCursorsWaiting(entry); - ++iter; - } - } - } - } +void RangeDeleter::doWork() { + Client::initThreadIfNotAlready("RangeDeleter"); + Client* client = &cc(); + + while (!inShutdown() && !stopRequested()) { + string errMsg; + + RangeDeleteEntry* nextTask = NULL; + + { + stdx::unique_lock<stdx::mutex> sl(_queueMutex); + while (_taskQueue.empty()) { + _taskQueueNotEmptyCV.timed_wait(sl, duration::milliseconds(kNotEmptyTimeoutMillis)); if (stopRequested()) { log() << "stopping range deleter worker" << endl; return; } - nextTask = _taskQueue.front(); - _taskQueue.pop_front(); + if (_taskQueue.empty()) { + // Try to check if some deletes are ready and move them to the + // ready queue. - _deletesInProgress++; - } + TaskList::iterator iter = _notReadyQueue.begin(); + while (iter != _notReadyQueue.end()) { + RangeDeleteEntry* entry = *iter; - { - auto txn = client->makeOperationContext(); - nextTask->stats.deleteStartTS = jsTime(); - bool delResult = _env->deleteRange(txn.get(), - *nextTask, - &nextTask->stats.deletedDocCount, - &errMsg); - nextTask->stats.deleteEndTS = jsTime(); - - if (delResult) { - nextTask->stats.waitForReplStartTS = jsTime(); + set<CursorId> cursorsNow; + if (entry->options.waitForOpenCursors) { + auto txn = client->makeOperationContext(); + _env->getCursorIds(txn.get(), entry->options.range.ns, &cursorsNow); + } - if (!_waitForMajority(txn.get(), &errMsg)) { - warning() << "Error encountered while waiting for replication: " << errMsg; + set<CursorId> cursorsLeft; + std::set_intersection(entry->cursorsToWait.begin(), + entry->cursorsToWait.end(), + cursorsNow.begin(), + cursorsNow.end(), + std::inserter(cursorsLeft, cursorsLeft.end())); + + entry->cursorsToWait.swap(cursorsLeft); + + if (entry->cursorsToWait.empty()) { + (*iter)->stats.queueEndTS = jsTime(); + _taskQueue.push_back(*iter); + _taskQueueNotEmptyCV.notify_one(); + iter = _notReadyQueue.erase(iter); + } else { + logCursorsWaiting(entry); + ++iter; + } } - - nextTask->stats.waitForReplEndTS = jsTime(); - } - else { - warning() << "Error encountered while trying to delete range: " - << errMsg << endl; } } - { - stdx::lock_guard<stdx::mutex> sl(_queueMutex); + if (stopRequested()) { + log() << "stopping range deleter worker" << endl; + return; + } - NSMinMax setEntry(nextTask->options.range.ns, - nextTask->options.range.minKey, - nextTask->options.range.maxKey); - deletePtrElement(&_deleteSet, &setEntry); - _deletesInProgress--; + nextTask = _taskQueue.front(); + _taskQueue.pop_front(); + + _deletesInProgress++; + } + + { + auto txn = client->makeOperationContext(); + nextTask->stats.deleteStartTS = jsTime(); + bool delResult = + _env->deleteRange(txn.get(), *nextTask, &nextTask->stats.deletedDocCount, &errMsg); + nextTask->stats.deleteEndTS = jsTime(); - if (nextTask->notifyDone) { - nextTask->notifyDone->notifyOne(); + if (delResult) { + nextTask->stats.waitForReplStartTS = jsTime(); + + if (!_waitForMajority(txn.get(), &errMsg)) { + warning() << "Error encountered while waiting for replication: " << errMsg; } - } - recordDelStats(new DeleteJobStats(nextTask->stats)); - delete nextTask; - nextTask = NULL; + nextTask->stats.waitForReplEndTS = jsTime(); + } else { + warning() << "Error encountered while trying to delete range: " << errMsg << endl; + } } - } - bool RangeDeleter::canEnqueue_inlock(StringData ns, - const BSONObj& min, - const BSONObj& max, - string* errMsg) const { + { + stdx::lock_guard<stdx::mutex> sl(_queueMutex); + + NSMinMax setEntry(nextTask->options.range.ns, + nextTask->options.range.minKey, + nextTask->options.range.maxKey); + deletePtrElement(&_deleteSet, &setEntry); + _deletesInProgress--; - NSMinMax toDelete(ns.toString(), min, max); - if (_deleteSet.count(&toDelete) > 0) { - *errMsg = str::stream() << "ns: " << ns << ", min: " << min << ", max: " << max - << " is already being processed for deletion."; - return false; + if (nextTask->notifyDone) { + nextTask->notifyDone->notifyOne(); + } } - return true; + recordDelStats(new DeleteJobStats(nextTask->stats)); + delete nextTask; + nextTask = NULL; } +} - bool RangeDeleter::stopRequested() const { - stdx::lock_guard<stdx::mutex> sl(_stopMutex); - return _stopRequested; +bool RangeDeleter::canEnqueue_inlock(StringData ns, + const BSONObj& min, + const BSONObj& max, + string* errMsg) const { + NSMinMax toDelete(ns.toString(), min, max); + if (_deleteSet.count(&toDelete) > 0) { + *errMsg = str::stream() << "ns: " << ns << ", min: " << min << ", max: " << max + << " is already being processed for deletion."; + return false; } - size_t RangeDeleter::getTotalDeletes() const { - stdx::lock_guard<stdx::mutex> sl(_queueMutex); - return _deleteSet.size(); - } + return true; +} - size_t RangeDeleter::getPendingDeletes() const { - stdx::lock_guard<stdx::mutex> sl(_queueMutex); - return _notReadyQueue.size() + _taskQueue.size(); - } +bool RangeDeleter::stopRequested() const { + stdx::lock_guard<stdx::mutex> sl(_stopMutex); + return _stopRequested; +} - size_t RangeDeleter::getDeletesInProgress() const { - stdx::lock_guard<stdx::mutex> sl(_queueMutex); - return _deletesInProgress; - } +size_t RangeDeleter::getTotalDeletes() const { + stdx::lock_guard<stdx::mutex> sl(_queueMutex); + return _deleteSet.size(); +} - void RangeDeleter::recordDelStats(DeleteJobStats* newStat) { - stdx::lock_guard<stdx::mutex> sl(_statsHistoryMutex); - if (_statsHistory.size() == kDeleteJobsHistory) { - delete _statsHistory.front(); - _statsHistory.pop_front(); - } +size_t RangeDeleter::getPendingDeletes() const { + stdx::lock_guard<stdx::mutex> sl(_queueMutex); + return _notReadyQueue.size() + _taskQueue.size(); +} + +size_t RangeDeleter::getDeletesInProgress() const { + stdx::lock_guard<stdx::mutex> sl(_queueMutex); + return _deletesInProgress; +} - _statsHistory.push_back(newStat); +void RangeDeleter::recordDelStats(DeleteJobStats* newStat) { + stdx::lock_guard<stdx::mutex> sl(_statsHistoryMutex); + if (_statsHistory.size() == kDeleteJobsHistory) { + delete _statsHistory.front(); + _statsHistory.pop_front(); } - RangeDeleteEntry::RangeDeleteEntry(const RangeDeleterOptions& options) - : options(options), notifyDone(NULL) {} + _statsHistory.push_back(newStat); +} - BSONObj RangeDeleteEntry::toBSON() const { - BSONObjBuilder builder; - builder.append("ns", options.range.ns); - builder.append("min", options.range.minKey); - builder.append("max", options.range.maxKey); - BSONArrayBuilder cursorBuilder(builder.subarrayStart("cursors")); +RangeDeleteEntry::RangeDeleteEntry(const RangeDeleterOptions& options) + : options(options), notifyDone(NULL) {} - for (std::set<CursorId>::const_iterator it = cursorsToWait.begin(); - it != cursorsToWait.end(); ++it) { - cursorBuilder.append((long long)*it); - } - cursorBuilder.doneFast(); +BSONObj RangeDeleteEntry::toBSON() const { + BSONObjBuilder builder; + builder.append("ns", options.range.ns); + builder.append("min", options.range.minKey); + builder.append("max", options.range.maxKey); + BSONArrayBuilder cursorBuilder(builder.subarrayStart("cursors")); - return builder.done().copy(); + for (std::set<CursorId>::const_iterator it = cursorsToWait.begin(); it != cursorsToWait.end(); + ++it) { + cursorBuilder.append((long long)*it); } + cursorBuilder.doneFast(); - RangeDeleterOptions::RangeDeleterOptions(const KeyRange& range) - : range(range), - fromMigrate(false), - onlyRemoveOrphanedDocs(false), - waitForOpenCursors(false) { - } + return builder.done().copy(); +} +RangeDeleterOptions::RangeDeleterOptions(const KeyRange& range) + : range(range), fromMigrate(false), onlyRemoveOrphanedDocs(false), waitForOpenCursors(false) {} } |