summaryrefslogtreecommitdiff
path: root/src/mongo/db/range_deleter.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 00:22:50 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 10:56:02 -0400
commit9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch)
tree3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/range_deleter.cpp
parent01965cf52bce6976637ecb8f4a622aeb05ab256a (diff)
downloadmongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/range_deleter.cpp')
-rw-r--r--src/mongo/db/range_deleter.cpp842
1 files changed, 404 insertions, 438 deletions
diff --git a/src/mongo/db/range_deleter.cpp b/src/mongo/db/range_deleter.cpp
index 771aa9074f7..036f8e7c4d4 100644
--- a/src/mongo/db/range_deleter.cpp
+++ b/src/mongo/db/range_deleter.cpp
@@ -53,349 +53,300 @@ using std::pair;
using std::string;
namespace {
- const long int kNotEmptyTimeoutMillis = 200;
- const long long int kMaxCursorCheckIntervalMillis = 500;
- const size_t kDeleteJobsHistory = 10; // entries
-
- /**
- * Removes an element from the container that holds a pointer type, and deletes the
- * pointer as well. Returns true if the element was found.
- */
- template <typename ContainerType, typename ContainerElementType>
- bool deletePtrElement(ContainerType* container, ContainerElementType elem) {
- typename ContainerType::iterator iter = container->find(elem);
-
- if (iter == container->end()) {
- return false;
- }
+const long int kNotEmptyTimeoutMillis = 200;
+const long long int kMaxCursorCheckIntervalMillis = 500;
+const size_t kDeleteJobsHistory = 10; // entries
- delete *iter;
- container->erase(iter);
- return true;
+/**
+ * Removes an element from the container that holds a pointer type, and deletes the
+ * pointer as well. Returns true if the element was found.
+ */
+template <typename ContainerType, typename ContainerElementType>
+bool deletePtrElement(ContainerType* container, ContainerElementType elem) {
+ typename ContainerType::iterator iter = container->find(elem);
+
+ if (iter == container->end()) {
+ return false;
}
+
+ delete *iter;
+ container->erase(iter);
+ return true;
+}
}
namespace mongo {
- namespace duration = boost::posix_time;
-
- static void logCursorsWaiting(RangeDeleteEntry* entry) {
+namespace duration = boost::posix_time;
- // We always log the first cursors waiting message (so we have cursor ids in the logs).
- // After 15 minutes (the cursor timeout period), we start logging additional messages at
- // a 1 minute interval.
- static const auto kLogCursorsThreshold = stdx::chrono::minutes{15};
- static const auto kLogCursorsInterval = stdx::chrono::minutes{1};
+static void logCursorsWaiting(RangeDeleteEntry* entry) {
+ // We always log the first cursors waiting message (so we have cursor ids in the logs).
+ // After 15 minutes (the cursor timeout period), we start logging additional messages at
+ // a 1 minute interval.
+ static const auto kLogCursorsThreshold = stdx::chrono::minutes{15};
+ static const auto kLogCursorsInterval = stdx::chrono::minutes{1};
- Date_t currentTime = jsTime();
- Milliseconds elapsedMillisSinceQueued{0};
+ Date_t currentTime = jsTime();
+ Milliseconds elapsedMillisSinceQueued{0};
- // We always log the first message when lastLoggedTime == 0
- if (entry->lastLoggedTS != Date_t()) {
+ // We always log the first message when lastLoggedTime == 0
+ if (entry->lastLoggedTS != Date_t()) {
+ if (currentTime > entry->stats.queueStartTS)
+ elapsedMillisSinceQueued = currentTime - entry->stats.queueStartTS;
- if (currentTime > entry->stats.queueStartTS)
- elapsedMillisSinceQueued = currentTime - entry->stats.queueStartTS;
-
- // Not logging, threshold not passed
- if (elapsedMillisSinceQueued < kLogCursorsThreshold)
- return;
+ // Not logging, threshold not passed
+ if (elapsedMillisSinceQueued < kLogCursorsThreshold)
+ return;
- Milliseconds elapsedMillisSinceLog{0};
- if (currentTime > entry->lastLoggedTS)
- elapsedMillisSinceLog = currentTime - entry->lastLoggedTS;
+ Milliseconds elapsedMillisSinceLog{0};
+ if (currentTime > entry->lastLoggedTS)
+ elapsedMillisSinceLog = currentTime - entry->lastLoggedTS;
- // Not logging, logged a short time ago
- if (elapsedMillisSinceLog < kLogCursorsInterval)
- return;
- }
-
- str::stream cursorList;
- for (std::set<CursorId>::const_iterator it = entry->cursorsToWait.begin();
- it != entry->cursorsToWait.end(); ++it) {
- if (it != entry->cursorsToWait.begin())
- cursorList << ", ";
- cursorList << *it;
- }
-
- log() << "waiting for open cursors before removing range "
- << "[" << entry->options.range.minKey << ", " << entry->options.range.maxKey << ") "
- << "in " << entry->options.range.ns
- << (entry->lastLoggedTS == Date_t() ?
- string("") :
- string(str::stream() << ", elapsed secs: " <<
- durationCount<Seconds>(elapsedMillisSinceQueued)))
- << ", cursor ids: [" << string(cursorList) << "]";
+ // Not logging, logged a short time ago
+ if (elapsedMillisSinceLog < kLogCursorsInterval)
+ return;
+ }
- entry->lastLoggedTS = currentTime;
+ str::stream cursorList;
+ for (std::set<CursorId>::const_iterator it = entry->cursorsToWait.begin();
+ it != entry->cursorsToWait.end();
+ ++it) {
+ if (it != entry->cursorsToWait.begin())
+ cursorList << ", ";
+ cursorList << *it;
}
- struct RangeDeleter::NSMinMax {
- NSMinMax(std::string ns, const BSONObj min, const BSONObj max):
- ns(ns), min(min), max(max) {
- }
+ log() << "waiting for open cursors before removing range "
+ << "[" << entry->options.range.minKey << ", " << entry->options.range.maxKey << ") "
+ << "in " << entry->options.range.ns
+ << (entry->lastLoggedTS == Date_t()
+ ? string("")
+ : string(str::stream() << ", elapsed secs: "
+ << durationCount<Seconds>(elapsedMillisSinceQueued)))
+ << ", cursor ids: [" << string(cursorList) << "]";
- std::string ns;
+ entry->lastLoggedTS = currentTime;
+}
- // Inclusive lower range.
- BSONObj min;
+struct RangeDeleter::NSMinMax {
+ NSMinMax(std::string ns, const BSONObj min, const BSONObj max) : ns(ns), min(min), max(max) {}
- // Exclusive upper range.
- BSONObj max;
- };
+ std::string ns;
- bool RangeDeleter::NSMinMaxCmp::operator()(
- const NSMinMax* lhs, const NSMinMax* rhs) const {
- const int nsComp = lhs->ns.compare(rhs->ns);
+ // Inclusive lower range.
+ BSONObj min;
- if (nsComp < 0) {
- return true;
- }
+ // Exclusive upper range.
+ BSONObj max;
+};
- if (nsComp > 0) {
- return false;
- }
+bool RangeDeleter::NSMinMaxCmp::operator()(const NSMinMax* lhs, const NSMinMax* rhs) const {
+ const int nsComp = lhs->ns.compare(rhs->ns);
- return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0;
+ if (nsComp < 0) {
+ return true;
}
- RangeDeleter::RangeDeleter(RangeDeleterEnv* env):
- _env(env), // ownership xfer
- _stopRequested(false),
- _deletesInProgress(0) {
+ if (nsComp > 0) {
+ return false;
}
- RangeDeleter::~RangeDeleter() {
- for(TaskList::iterator it = _notReadyQueue.begin();
- it != _notReadyQueue.end();
- ++it) {
- delete (*it);
- }
-
- for(TaskList::iterator it = _taskQueue.begin();
- it != _taskQueue.end();
- ++it) {
- delete (*it);
- }
-
- for(NSMinMaxSet::iterator it = _deleteSet.begin();
- it != _deleteSet.end();
- ++it) {
- delete (*it);
- }
+ return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0;
+}
- for(std::deque<DeleteJobStats*>::iterator it = _statsHistory.begin();
- it != _statsHistory.end();
- ++it) {
- delete (*it);
- }
+RangeDeleter::RangeDeleter(RangeDeleterEnv* env)
+ : _env(env), // ownership xfer
+ _stopRequested(false),
+ _deletesInProgress(0) {}
+RangeDeleter::~RangeDeleter() {
+ for (TaskList::iterator it = _notReadyQueue.begin(); it != _notReadyQueue.end(); ++it) {
+ delete (*it);
}
- void RangeDeleter::startWorkers() {
- if (!_worker) {
- _worker.reset(new stdx::thread(stdx::bind(&RangeDeleter::doWork, this)));
- }
+ for (TaskList::iterator it = _taskQueue.begin(); it != _taskQueue.end(); ++it) {
+ delete (*it);
}
- void RangeDeleter::stopWorkers() {
- {
- stdx::lock_guard<stdx::mutex> sl(_stopMutex);
- _stopRequested = true;
- }
+ for (NSMinMaxSet::iterator it = _deleteSet.begin(); it != _deleteSet.end(); ++it) {
+ delete (*it);
+ }
- if (_worker) {
- _worker->join();
- }
+ for (std::deque<DeleteJobStats*>::iterator it = _statsHistory.begin();
+ it != _statsHistory.end();
+ ++it) {
+ delete (*it);
+ }
+}
- stdx::unique_lock<stdx::mutex> sl(_queueMutex);
- while (_deletesInProgress > 0) {
- _nothingInProgressCV.wait(sl);
- }
+void RangeDeleter::startWorkers() {
+ if (!_worker) {
+ _worker.reset(new stdx::thread(stdx::bind(&RangeDeleter::doWork, this)));
}
+}
- bool RangeDeleter::queueDelete(OperationContext* txn,
- const RangeDeleterOptions& options,
- Notification* notifyDone,
- std::string* errMsg) {
- string dummy;
- if (errMsg == NULL) errMsg = &dummy;
+void RangeDeleter::stopWorkers() {
+ {
+ stdx::lock_guard<stdx::mutex> sl(_stopMutex);
+ _stopRequested = true;
+ }
- const string& ns(options.range.ns);
- const BSONObj& min(options.range.minKey);
- const BSONObj& max(options.range.maxKey);
+ if (_worker) {
+ _worker->join();
+ }
- unique_ptr<RangeDeleteEntry> toDelete(
- new RangeDeleteEntry(options));
- toDelete->notifyDone = notifyDone;
+ stdx::unique_lock<stdx::mutex> sl(_queueMutex);
+ while (_deletesInProgress > 0) {
+ _nothingInProgressCV.wait(sl);
+ }
+}
- {
- stdx::lock_guard<stdx::mutex> sl(_queueMutex);
- if (_stopRequested) {
- *errMsg = "deleter is already stopped.";
- return false;
- }
+bool RangeDeleter::queueDelete(OperationContext* txn,
+ const RangeDeleterOptions& options,
+ Notification* notifyDone,
+ std::string* errMsg) {
+ string dummy;
+ if (errMsg == NULL)
+ errMsg = &dummy;
- if (!canEnqueue_inlock(ns, min, max, errMsg)) {
- return false;
- }
+ const string& ns(options.range.ns);
+ const BSONObj& min(options.range.minKey);
+ const BSONObj& max(options.range.maxKey);
- _deleteSet.insert(new NSMinMax(ns, min.getOwned(), max.getOwned()));
- }
+ unique_ptr<RangeDeleteEntry> toDelete(new RangeDeleteEntry(options));
+ toDelete->notifyDone = notifyDone;
- if (options.waitForOpenCursors) {
- _env->getCursorIds(txn, ns, &toDelete->cursorsToWait);
+ {
+ stdx::lock_guard<stdx::mutex> sl(_queueMutex);
+ if (_stopRequested) {
+ *errMsg = "deleter is already stopped.";
+ return false;
}
- toDelete->stats.queueStartTS = jsTime();
-
- if (!toDelete->cursorsToWait.empty())
- logCursorsWaiting(toDelete.get());
-
- {
- stdx::lock_guard<stdx::mutex> sl(_queueMutex);
-
- if (toDelete->cursorsToWait.empty()) {
- toDelete->stats.queueEndTS = jsTime();
- _taskQueue.push_back(toDelete.release());
- _taskQueueNotEmptyCV.notify_one();
- }
- else {
- _notReadyQueue.push_back(toDelete.release());
- }
+ if (!canEnqueue_inlock(ns, min, max, errMsg)) {
+ return false;
}
- return true;
+ _deleteSet.insert(new NSMinMax(ns, min.getOwned(), max.getOwned()));
}
-namespace {
- const int kWTimeoutMillis = 60 * 60 * 1000;
-
- bool _waitForMajority(OperationContext* txn, std::string* errMsg) {
- const WriteConcernOptions writeConcern(WriteConcernOptions::kMajority,
- WriteConcernOptions::NONE,
- kWTimeoutMillis);
-
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOpForClient(
- txn, writeConcern);
- Milliseconds elapsedTime = replStatus.duration;
- if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) {
- *errMsg = str::stream() << "rangeDeleter timed out after "
- << durationCount<Seconds>(elapsedTime)
- << " seconds while waiting"
- " for deletions to be replicated to majority nodes";
- log() << *errMsg;
- }
- else if (replStatus.status.code() == ErrorCodes::NotMaster) {
- *errMsg = str::stream() << "rangeDeleter no longer PRIMARY after "
- << durationCount<Seconds>(elapsedTime)
- << " seconds while waiting"
- " for deletions to be replicated to majority nodes";
- }
- else {
- LOG(elapsedTime < Seconds(30) ? 1 : 0)
- << "rangeDeleter took " << durationCount<Seconds>(elapsedTime) << " seconds "
- << " waiting for deletes to be replicated to majority nodes";
-
- fassert(18512, replStatus.status);
- }
-
- return replStatus.status.isOK();
+ if (options.waitForOpenCursors) {
+ _env->getCursorIds(txn, ns, &toDelete->cursorsToWait);
}
-}
-
- bool RangeDeleter::deleteNow(OperationContext* txn,
- const RangeDeleterOptions& options,
- string* errMsg) {
- if (stopRequested()) {
- *errMsg = "deleter is already stopped.";
- return false;
- }
- string dummy;
- if (errMsg == NULL) errMsg = &dummy;
+ toDelete->stats.queueStartTS = jsTime();
- const string& ns(options.range.ns);
- const BSONObj& min(options.range.minKey);
- const BSONObj& max(options.range.maxKey);
+ if (!toDelete->cursorsToWait.empty())
+ logCursorsWaiting(toDelete.get());
- NSMinMax deleteRange(ns, min, max);
- {
- stdx::lock_guard<stdx::mutex> sl(_queueMutex);
- if (!canEnqueue_inlock(ns, min, max, errMsg)) {
- return false;
- }
-
- _deleteSet.insert(&deleteRange);
+ {
+ stdx::lock_guard<stdx::mutex> sl(_queueMutex);
- // Note: count for pending deletes is an integral part of the shutdown story.
- // Therefore, to simplify things, there is no "pending" state for deletes in
- // deleteNow, the state transition is simply inProgress -> done.
- _deletesInProgress++;
+ if (toDelete->cursorsToWait.empty()) {
+ toDelete->stats.queueEndTS = jsTime();
+ _taskQueue.push_back(toDelete.release());
+ _taskQueueNotEmptyCV.notify_one();
+ } else {
+ _notReadyQueue.push_back(toDelete.release());
}
+ }
- set<CursorId> cursorsToWait;
- if (options.waitForOpenCursors) {
- _env->getCursorIds(txn, ns, &cursorsToWait);
- }
+ return true;
+}
- long long checkIntervalMillis = 5;
+namespace {
+const int kWTimeoutMillis = 60 * 60 * 1000;
+
+bool _waitForMajority(OperationContext* txn, std::string* errMsg) {
+ const WriteConcernOptions writeConcern(
+ WriteConcernOptions::kMajority, WriteConcernOptions::NONE, kWTimeoutMillis);
+
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOpForClient(txn,
+ writeConcern);
+ Milliseconds elapsedTime = replStatus.duration;
+ if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) {
+ *errMsg = str::stream() << "rangeDeleter timed out after "
+ << durationCount<Seconds>(elapsedTime)
+ << " seconds while waiting"
+ " for deletions to be replicated to majority nodes";
+ log() << *errMsg;
+ } else if (replStatus.status.code() == ErrorCodes::NotMaster) {
+ *errMsg = str::stream() << "rangeDeleter no longer PRIMARY after "
+ << durationCount<Seconds>(elapsedTime)
+ << " seconds while waiting"
+ " for deletions to be replicated to majority nodes";
+ } else {
+ LOG(elapsedTime < Seconds(30) ? 1 : 0)
+ << "rangeDeleter took " << durationCount<Seconds>(elapsedTime) << " seconds "
+ << " waiting for deletes to be replicated to majority nodes";
+
+ fassert(18512, replStatus.status);
+ }
- RangeDeleteEntry taskDetails(options);
- taskDetails.stats.queueStartTS = jsTime();
+ return replStatus.status.isOK();
+}
+}
- for (; !cursorsToWait.empty(); sleepmillis(checkIntervalMillis)) {
+bool RangeDeleter::deleteNow(OperationContext* txn,
+ const RangeDeleterOptions& options,
+ string* errMsg) {
+ if (stopRequested()) {
+ *errMsg = "deleter is already stopped.";
+ return false;
+ }
- logCursorsWaiting(&taskDetails);
+ string dummy;
+ if (errMsg == NULL)
+ errMsg = &dummy;
- set<CursorId> cursorsNow;
- _env->getCursorIds(txn, ns, &cursorsNow);
+ const string& ns(options.range.ns);
+ const BSONObj& min(options.range.minKey);
+ const BSONObj& max(options.range.maxKey);
- set<CursorId> cursorsLeft;
- std::set_intersection(cursorsToWait.begin(),
- cursorsToWait.end(),
- cursorsNow.begin(),
- cursorsNow.end(),
- std::inserter(cursorsLeft, cursorsLeft.end()));
+ NSMinMax deleteRange(ns, min, max);
+ {
+ stdx::lock_guard<stdx::mutex> sl(_queueMutex);
+ if (!canEnqueue_inlock(ns, min, max, errMsg)) {
+ return false;
+ }
- cursorsToWait.swap(cursorsLeft);
+ _deleteSet.insert(&deleteRange);
- if (stopRequested()) {
- *errMsg = "deleter was stopped.";
+ // Note: count for pending deletes is an integral part of the shutdown story.
+ // Therefore, to simplify things, there is no "pending" state for deletes in
+ // deleteNow, the state transition is simply inProgress -> done.
+ _deletesInProgress++;
+ }
- stdx::lock_guard<stdx::mutex> sl(_queueMutex);
- _deleteSet.erase(&deleteRange);
+ set<CursorId> cursorsToWait;
+ if (options.waitForOpenCursors) {
+ _env->getCursorIds(txn, ns, &cursorsToWait);
+ }
- _deletesInProgress--;
+ long long checkIntervalMillis = 5;
- if (_deletesInProgress == 0) {
- _nothingInProgressCV.notify_one();
- }
+ RangeDeleteEntry taskDetails(options);
+ taskDetails.stats.queueStartTS = jsTime();
- return false;
- }
+ for (; !cursorsToWait.empty(); sleepmillis(checkIntervalMillis)) {
+ logCursorsWaiting(&taskDetails);
- if (checkIntervalMillis < kMaxCursorCheckIntervalMillis) {
- checkIntervalMillis *= 2;
- }
- }
- taskDetails.stats.queueEndTS = jsTime();
+ set<CursorId> cursorsNow;
+ _env->getCursorIds(txn, ns, &cursorsNow);
- taskDetails.stats.deleteStartTS = jsTime();
- bool result = _env->deleteRange(txn,
- taskDetails,
- &taskDetails.stats.deletedDocCount,
- errMsg);
+ set<CursorId> cursorsLeft;
+ std::set_intersection(cursorsToWait.begin(),
+ cursorsToWait.end(),
+ cursorsNow.begin(),
+ cursorsNow.end(),
+ std::inserter(cursorsLeft, cursorsLeft.end()));
- taskDetails.stats.deleteEndTS = jsTime();
+ cursorsToWait.swap(cursorsLeft);
- if (result) {
- taskDetails.stats.waitForReplStartTS = jsTime();
- result = _waitForMajority(txn, errMsg);
- taskDetails.stats.waitForReplEndTS = jsTime();
- }
+ if (stopRequested()) {
+ *errMsg = "deleter was stopped.";
- {
stdx::lock_guard<stdx::mutex> sl(_queueMutex);
_deleteSet.erase(&deleteRange);
@@ -404,229 +355,244 @@ namespace {
if (_deletesInProgress == 0) {
_nothingInProgressCV.notify_one();
}
+
+ return false;
}
- recordDelStats(new DeleteJobStats(taskDetails.stats));
- return result;
+ if (checkIntervalMillis < kMaxCursorCheckIntervalMillis) {
+ checkIntervalMillis *= 2;
+ }
}
+ taskDetails.stats.queueEndTS = jsTime();
- void RangeDeleter::getStatsHistory(std::vector<DeleteJobStats*>* stats) const {
- stats->clear();
- stats->reserve(kDeleteJobsHistory);
+ taskDetails.stats.deleteStartTS = jsTime();
+ bool result = _env->deleteRange(txn, taskDetails, &taskDetails.stats.deletedDocCount, errMsg);
- stdx::lock_guard<stdx::mutex> sl(_statsHistoryMutex);
- for (std::deque<DeleteJobStats*>::const_iterator it = _statsHistory.begin();
- it != _statsHistory.end(); ++it) {
- stats->push_back(new DeleteJobStats(**it));
- }
+ taskDetails.stats.deleteEndTS = jsTime();
+
+ if (result) {
+ taskDetails.stats.waitForReplStartTS = jsTime();
+ result = _waitForMajority(txn, errMsg);
+ taskDetails.stats.waitForReplEndTS = jsTime();
}
- BSONObj RangeDeleter::toBSON() const {
+ {
stdx::lock_guard<stdx::mutex> sl(_queueMutex);
+ _deleteSet.erase(&deleteRange);
- BSONObjBuilder builder;
+ _deletesInProgress--;
- BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady"));
- for (TaskList::const_iterator iter = _notReadyQueue.begin();
- iter != _notReadyQueue.end(); ++iter) {
- notReadyBuilder.append((*iter)->toBSON());
+ if (_deletesInProgress == 0) {
+ _nothingInProgressCV.notify_one();
}
- notReadyBuilder.doneFast();
+ }
- BSONArrayBuilder readyBuilder(builder.subarrayStart("ready"));
- for (TaskList::const_iterator iter = _taskQueue.begin();
- iter != _taskQueue.end(); ++iter) {
- readyBuilder.append((*iter)->toBSON());
- }
- readyBuilder.doneFast();
+ recordDelStats(new DeleteJobStats(taskDetails.stats));
+ return result;
+}
+
+void RangeDeleter::getStatsHistory(std::vector<DeleteJobStats*>* stats) const {
+ stats->clear();
+ stats->reserve(kDeleteJobsHistory);
- return builder.obj();
+ stdx::lock_guard<stdx::mutex> sl(_statsHistoryMutex);
+ for (std::deque<DeleteJobStats*>::const_iterator it = _statsHistory.begin();
+ it != _statsHistory.end();
+ ++it) {
+ stats->push_back(new DeleteJobStats(**it));
}
+}
- void RangeDeleter::doWork() {
- Client::initThreadIfNotAlready("RangeDeleter");
- Client* client = &cc();
+BSONObj RangeDeleter::toBSON() const {
+ stdx::lock_guard<stdx::mutex> sl(_queueMutex);
- while (!inShutdown() && !stopRequested()) {
- string errMsg;
+ BSONObjBuilder builder;
- RangeDeleteEntry* nextTask = NULL;
+ BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady"));
+ for (TaskList::const_iterator iter = _notReadyQueue.begin(); iter != _notReadyQueue.end();
+ ++iter) {
+ notReadyBuilder.append((*iter)->toBSON());
+ }
+ notReadyBuilder.doneFast();
- {
- stdx::unique_lock<stdx::mutex> sl(_queueMutex);
- while (_taskQueue.empty()) {
- _taskQueueNotEmptyCV.timed_wait(
- sl, duration::milliseconds(kNotEmptyTimeoutMillis));
+ BSONArrayBuilder readyBuilder(builder.subarrayStart("ready"));
+ for (TaskList::const_iterator iter = _taskQueue.begin(); iter != _taskQueue.end(); ++iter) {
+ readyBuilder.append((*iter)->toBSON());
+ }
+ readyBuilder.doneFast();
- if (stopRequested()) {
- log() << "stopping range deleter worker" << endl;
- return;
- }
+ return builder.obj();
+}
- if (_taskQueue.empty()) {
- // Try to check if some deletes are ready and move them to the
- // ready queue.
-
- TaskList::iterator iter = _notReadyQueue.begin();
- while (iter != _notReadyQueue.end()) {
- RangeDeleteEntry* entry = *iter;
-
- set<CursorId> cursorsNow;
- if (entry->options.waitForOpenCursors) {
- auto txn = client->makeOperationContext();
- _env->getCursorIds(txn.get(),
- entry->options.range.ns,
- &cursorsNow);
- }
-
- set<CursorId> cursorsLeft;
- std::set_intersection(entry->cursorsToWait.begin(),
- entry->cursorsToWait.end(),
- cursorsNow.begin(),
- cursorsNow.end(),
- std::inserter(cursorsLeft,
- cursorsLeft.end()));
-
- entry->cursorsToWait.swap(cursorsLeft);
-
- if (entry->cursorsToWait.empty()) {
- (*iter)->stats.queueEndTS = jsTime();
- _taskQueue.push_back(*iter);
- _taskQueueNotEmptyCV.notify_one();
- iter = _notReadyQueue.erase(iter);
- }
- else {
- logCursorsWaiting(entry);
- ++iter;
- }
- }
- }
- }
+void RangeDeleter::doWork() {
+ Client::initThreadIfNotAlready("RangeDeleter");
+ Client* client = &cc();
+
+ while (!inShutdown() && !stopRequested()) {
+ string errMsg;
+
+ RangeDeleteEntry* nextTask = NULL;
+
+ {
+ stdx::unique_lock<stdx::mutex> sl(_queueMutex);
+ while (_taskQueue.empty()) {
+ _taskQueueNotEmptyCV.timed_wait(sl, duration::milliseconds(kNotEmptyTimeoutMillis));
if (stopRequested()) {
log() << "stopping range deleter worker" << endl;
return;
}
- nextTask = _taskQueue.front();
- _taskQueue.pop_front();
+ if (_taskQueue.empty()) {
+ // Try to check if some deletes are ready and move them to the
+ // ready queue.
- _deletesInProgress++;
- }
+ TaskList::iterator iter = _notReadyQueue.begin();
+ while (iter != _notReadyQueue.end()) {
+ RangeDeleteEntry* entry = *iter;
- {
- auto txn = client->makeOperationContext();
- nextTask->stats.deleteStartTS = jsTime();
- bool delResult = _env->deleteRange(txn.get(),
- *nextTask,
- &nextTask->stats.deletedDocCount,
- &errMsg);
- nextTask->stats.deleteEndTS = jsTime();
-
- if (delResult) {
- nextTask->stats.waitForReplStartTS = jsTime();
+ set<CursorId> cursorsNow;
+ if (entry->options.waitForOpenCursors) {
+ auto txn = client->makeOperationContext();
+ _env->getCursorIds(txn.get(), entry->options.range.ns, &cursorsNow);
+ }
- if (!_waitForMajority(txn.get(), &errMsg)) {
- warning() << "Error encountered while waiting for replication: " << errMsg;
+ set<CursorId> cursorsLeft;
+ std::set_intersection(entry->cursorsToWait.begin(),
+ entry->cursorsToWait.end(),
+ cursorsNow.begin(),
+ cursorsNow.end(),
+ std::inserter(cursorsLeft, cursorsLeft.end()));
+
+ entry->cursorsToWait.swap(cursorsLeft);
+
+ if (entry->cursorsToWait.empty()) {
+ (*iter)->stats.queueEndTS = jsTime();
+ _taskQueue.push_back(*iter);
+ _taskQueueNotEmptyCV.notify_one();
+ iter = _notReadyQueue.erase(iter);
+ } else {
+ logCursorsWaiting(entry);
+ ++iter;
+ }
}
-
- nextTask->stats.waitForReplEndTS = jsTime();
- }
- else {
- warning() << "Error encountered while trying to delete range: "
- << errMsg << endl;
}
}
- {
- stdx::lock_guard<stdx::mutex> sl(_queueMutex);
+ if (stopRequested()) {
+ log() << "stopping range deleter worker" << endl;
+ return;
+ }
- NSMinMax setEntry(nextTask->options.range.ns,
- nextTask->options.range.minKey,
- nextTask->options.range.maxKey);
- deletePtrElement(&_deleteSet, &setEntry);
- _deletesInProgress--;
+ nextTask = _taskQueue.front();
+ _taskQueue.pop_front();
+
+ _deletesInProgress++;
+ }
+
+ {
+ auto txn = client->makeOperationContext();
+ nextTask->stats.deleteStartTS = jsTime();
+ bool delResult =
+ _env->deleteRange(txn.get(), *nextTask, &nextTask->stats.deletedDocCount, &errMsg);
+ nextTask->stats.deleteEndTS = jsTime();
- if (nextTask->notifyDone) {
- nextTask->notifyDone->notifyOne();
+ if (delResult) {
+ nextTask->stats.waitForReplStartTS = jsTime();
+
+ if (!_waitForMajority(txn.get(), &errMsg)) {
+ warning() << "Error encountered while waiting for replication: " << errMsg;
}
- }
- recordDelStats(new DeleteJobStats(nextTask->stats));
- delete nextTask;
- nextTask = NULL;
+ nextTask->stats.waitForReplEndTS = jsTime();
+ } else {
+ warning() << "Error encountered while trying to delete range: " << errMsg << endl;
+ }
}
- }
- bool RangeDeleter::canEnqueue_inlock(StringData ns,
- const BSONObj& min,
- const BSONObj& max,
- string* errMsg) const {
+ {
+ stdx::lock_guard<stdx::mutex> sl(_queueMutex);
+
+ NSMinMax setEntry(nextTask->options.range.ns,
+ nextTask->options.range.minKey,
+ nextTask->options.range.maxKey);
+ deletePtrElement(&_deleteSet, &setEntry);
+ _deletesInProgress--;
- NSMinMax toDelete(ns.toString(), min, max);
- if (_deleteSet.count(&toDelete) > 0) {
- *errMsg = str::stream() << "ns: " << ns << ", min: " << min << ", max: " << max
- << " is already being processed for deletion.";
- return false;
+ if (nextTask->notifyDone) {
+ nextTask->notifyDone->notifyOne();
+ }
}
- return true;
+ recordDelStats(new DeleteJobStats(nextTask->stats));
+ delete nextTask;
+ nextTask = NULL;
}
+}
- bool RangeDeleter::stopRequested() const {
- stdx::lock_guard<stdx::mutex> sl(_stopMutex);
- return _stopRequested;
+bool RangeDeleter::canEnqueue_inlock(StringData ns,
+ const BSONObj& min,
+ const BSONObj& max,
+ string* errMsg) const {
+ NSMinMax toDelete(ns.toString(), min, max);
+ if (_deleteSet.count(&toDelete) > 0) {
+ *errMsg = str::stream() << "ns: " << ns << ", min: " << min << ", max: " << max
+ << " is already being processed for deletion.";
+ return false;
}
- size_t RangeDeleter::getTotalDeletes() const {
- stdx::lock_guard<stdx::mutex> sl(_queueMutex);
- return _deleteSet.size();
- }
+ return true;
+}
- size_t RangeDeleter::getPendingDeletes() const {
- stdx::lock_guard<stdx::mutex> sl(_queueMutex);
- return _notReadyQueue.size() + _taskQueue.size();
- }
+bool RangeDeleter::stopRequested() const {
+ stdx::lock_guard<stdx::mutex> sl(_stopMutex);
+ return _stopRequested;
+}
- size_t RangeDeleter::getDeletesInProgress() const {
- stdx::lock_guard<stdx::mutex> sl(_queueMutex);
- return _deletesInProgress;
- }
+size_t RangeDeleter::getTotalDeletes() const {
+ stdx::lock_guard<stdx::mutex> sl(_queueMutex);
+ return _deleteSet.size();
+}
- void RangeDeleter::recordDelStats(DeleteJobStats* newStat) {
- stdx::lock_guard<stdx::mutex> sl(_statsHistoryMutex);
- if (_statsHistory.size() == kDeleteJobsHistory) {
- delete _statsHistory.front();
- _statsHistory.pop_front();
- }
+size_t RangeDeleter::getPendingDeletes() const {
+ stdx::lock_guard<stdx::mutex> sl(_queueMutex);
+ return _notReadyQueue.size() + _taskQueue.size();
+}
+
+size_t RangeDeleter::getDeletesInProgress() const {
+ stdx::lock_guard<stdx::mutex> sl(_queueMutex);
+ return _deletesInProgress;
+}
- _statsHistory.push_back(newStat);
+void RangeDeleter::recordDelStats(DeleteJobStats* newStat) {
+ stdx::lock_guard<stdx::mutex> sl(_statsHistoryMutex);
+ if (_statsHistory.size() == kDeleteJobsHistory) {
+ delete _statsHistory.front();
+ _statsHistory.pop_front();
}
- RangeDeleteEntry::RangeDeleteEntry(const RangeDeleterOptions& options)
- : options(options), notifyDone(NULL) {}
+ _statsHistory.push_back(newStat);
+}
- BSONObj RangeDeleteEntry::toBSON() const {
- BSONObjBuilder builder;
- builder.append("ns", options.range.ns);
- builder.append("min", options.range.minKey);
- builder.append("max", options.range.maxKey);
- BSONArrayBuilder cursorBuilder(builder.subarrayStart("cursors"));
+RangeDeleteEntry::RangeDeleteEntry(const RangeDeleterOptions& options)
+ : options(options), notifyDone(NULL) {}
- for (std::set<CursorId>::const_iterator it = cursorsToWait.begin();
- it != cursorsToWait.end(); ++it) {
- cursorBuilder.append((long long)*it);
- }
- cursorBuilder.doneFast();
+BSONObj RangeDeleteEntry::toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("ns", options.range.ns);
+ builder.append("min", options.range.minKey);
+ builder.append("max", options.range.maxKey);
+ BSONArrayBuilder cursorBuilder(builder.subarrayStart("cursors"));
- return builder.done().copy();
+ for (std::set<CursorId>::const_iterator it = cursorsToWait.begin(); it != cursorsToWait.end();
+ ++it) {
+ cursorBuilder.append((long long)*it);
}
+ cursorBuilder.doneFast();
- RangeDeleterOptions::RangeDeleterOptions(const KeyRange& range)
- : range(range),
- fromMigrate(false),
- onlyRemoveOrphanedDocs(false),
- waitForOpenCursors(false) {
- }
+ return builder.done().copy();
+}
+RangeDeleterOptions::RangeDeleterOptions(const KeyRange& range)
+ : range(range), fromMigrate(false), onlyRemoveOrphanedDocs(false), waitForOpenCursors(false) {}
}