#include "mongo/db/range_deleter.h"
#include "mongo/s/range_arithmetic.h"
#include "mongo/db/range_deleter_stats.h"
#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/time_support.h"
using std::auto_ptr;
using std::set;
using std::pair;
using std::string;
using mongoutils::str::stream;
namespace {
const long int NotEmptyTimeoutMillis = 200;
const long long int MaxCurorCheckIntervalMillis = 500;
* Removes an element from the container that holds a pointer type, and deletes the
* pointer as well. Returns true if the element was found.
bool deletePtrElement(ContainerType* container, ContainerElementType elem) {
typename ContainerType::iterator iter = container->find(elem);
if (iter == container->end()) {
return false;
delete *iter;
return true;
namespace mongo {
namespace duration = boost::posix_time;
struct RangeDeleter::RangeDeleteEntry {
notifyDone(NULL) {
std::string ns;
// Inclusive lower range.
BSONObj min;
// Exclusive upper range.
BSONObj max;
// The key pattern of the index the range refers to.
// This is relevant especially with special indexes types
// like hash indexes.
BSONObj shardKeyPattern;
bool secondaryThrottle;
// Sets of cursors to wait to close until this can be ready
// for deletion.
std::set cursorsToWait;
// Not owned here.
// Important invariant: Can only be set and used by one thread.
Notification* notifyDone;
// For debugging only
BSONObj toBSON() const {
return BSON("ns" << ns
<< "min" << min
<< "max" << max
<< "notifyDoneAddr" << reinterpret_cast(notifyDone));
struct RangeDeleter::NSMinMax {
NSMinMax(std::string ns, const BSONObj min, const BSONObj max):
ns(ns), min(min), max(max) {
std::string ns;
// Inclusive lower range.
BSONObj min;
// Exclusive upper range.
BSONObj max;
bool RangeDeleter::NSMinMaxCmp::operator()(
const NSMinMax* lhs, const NSMinMax* rhs) const {
const int nsComp = lhs->ns.compare(rhs->ns);
if (nsComp < 0) {
return true;
if (nsComp > 0) {
return false;
return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0;
RangeDeleter::RangeDeleter(RangeDeleterEnv* env):
_env(env), // ownership xfer
_stats(new RangeDeleterStats(&_queueMutex)) {
RangeDeleter::~RangeDeleter() {
for(TaskList::iterator it = _notReadyQueue.begin();
it != _notReadyQueue.end();
++it) {
delete (*it);
for(TaskList::iterator it = _taskQueue.begin();
it != _taskQueue.end();
++it) {
delete (*it);
for(NSMinMaxSet::iterator it = _deleteSet.begin();
it != _deleteSet.end();
++it) {
delete (*it);
for(NSMinMaxSet::iterator it = _blackList.begin();
it != _blackList.end();
++it) {
delete (*it);
void RangeDeleter::startWorkers() {
if (!_worker) {
_worker.reset(new boost::thread(boost::bind(&RangeDeleter::doWork, this)));
void RangeDeleter::stopWorkers() {
scoped_lock sl(_stopMutex);
_stopRequested = true;
if (_worker) {
scoped_lock sl(_queueMutex);
while (_stats->hasInProgress_inlock()) {
bool RangeDeleter::queueDelete(const std::string& ns,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
bool secondaryThrottle,
Notification* notifyDone,
std::string* errMsg) {
string dummy;
if (errMsg == NULL) errMsg = &dummy;
auto_ptr toDelete(new RangeDeleteEntry);
toDelete->ns = ns;
toDelete->min = min.getOwned();
toDelete->max = max.getOwned();
toDelete->shardKeyPattern = shardKeyPattern.getOwned();
toDelete->secondaryThrottle = secondaryThrottle;
toDelete->notifyDone = notifyDone;
scoped_lock sl(_queueMutex);
if (_stopRequested) {
*errMsg = "deleter is already stopped.";
return false;
if (!canEnqueue_inlock(ns, min, max, errMsg)) {
return false;
_deleteSet.insert(new NSMinMax(ns, min, max));
_env->getCursorIds(ns, &toDelete->cursorsToWait);
scoped_lock sl(_queueMutex);
if (toDelete->cursorsToWait.empty()) {
else {
return true;
bool RangeDeleter::deleteNow(const std::string& ns,
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern,
bool secondaryThrottle,
string* errMsg) {
if (stopRequested()) {
*errMsg = "deleter is already stopped.";
return false;
string dummy;
if (errMsg == NULL) errMsg = &dummy;
NSMinMax deleteRange(ns, min, max);
scoped_lock sl(_queueMutex);
if (!canEnqueue_inlock(ns, min, max, errMsg)) {
return false;
// Note: count for pending deletes is an integral part of the shutdown story.
// Therefore, to simplify things, there is no "pending" state for deletes in
// deleteNow, the state transition is simply inProgress -> done.
set cursorsToWait;
_env->getCursorIds(ns, &cursorsToWait);
long long checkIntervalMillis = 5;
while (!cursorsToWait.empty()) {
set cursorsNow;
_env->getCursorIds(ns, &cursorsNow);
set cursorsLeft;
std::inserter(cursorsLeft, cursorsLeft.end()));
if (stopRequested()) {
*errMsg = "deleter was stopped.";
scoped_lock sl(_queueMutex);
if (!_stats->hasInProgress_inlock()) {
return false;
if (checkIntervalMillis < MaxCurorCheckIntervalMillis) {
checkIntervalMillis *= 2;
bool result = _env->deleteRange(ns, min, max, shardKeyPattern,
secondaryThrottle, errMsg);
scoped_lock sl(_queueMutex);
if (!_stats->hasInProgress_inlock()) {
return result;
bool RangeDeleter::addToBlackList(const StringData& ns,
const BSONObj& min,
const BSONObj& max,
std::string* errMsg) {
string dummy;
if (errMsg == NULL) errMsg = &dummy;
scoped_lock sl(_queueMutex);
if (isBlacklisted_inlock(ns, min, max, errMsg)) {
return false;
for (NSMinMaxSet::const_iterator iter = _deleteSet.begin();
iter != _deleteSet.end(); ++iter) {
const NSMinMax* const entry = *iter;
if (entry->ns == ns && rangeOverlaps(entry->min, entry->max, min, max)) {
*errMsg = stream() << "Cannot black list ns: " << ns
<< ", min: " << min
<< ", max: " << max
<< " since it is already queued for deletion.";
return false;
_blackList.insert(new NSMinMax(ns.toString(), min, max));
return true;
bool RangeDeleter::removeFromBlackList(const StringData& ns,
const BSONObj& min,
const BSONObj& max) {
scoped_lock sl(_queueMutex);
NSMinMax entry(ns.toString(), min, max);
return deletePtrElement(&_blackList, &entry);
const RangeDeleterStats* RangeDeleter::getStats() const {
return _stats.get();
BSONObj RangeDeleter::toBSON() const {
scoped_lock sl(_queueMutex);
BSONObjBuilder builder;
BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady"));
for (TaskList::const_iterator iter = _notReadyQueue.begin();
iter != _notReadyQueue.end(); ++iter) {
BSONArrayBuilder readyBuilder(builder.subarrayStart("ready"));
for (TaskList::const_iterator iter = _taskQueue.begin();
iter != _taskQueue.end(); ++iter) {
return builder.obj();
void RangeDeleter::doWork() {
while (!inShutdown() && !stopRequested()) {
string errMsg;
RangeDeleteEntry* nextTask = NULL;
scoped_lock sl(_queueMutex);
while (_taskQueue.empty()) {
sl.boost(), duration::milliseconds(NotEmptyTimeoutMillis));
if (stopRequested()) {
log() << "stopping range deleter worker" << endl;
if (_taskQueue.empty()) {
// Try to check if some deletes are ready and move them to the
// ready queue.
TaskList::iterator iter = _notReadyQueue.begin();
while (iter != _notReadyQueue.end()) {
RangeDeleteEntry* entry = *iter;
set cursorsNow;
_env->getCursorIds(entry->ns, &cursorsNow);
set cursorsLeft;
if (entry->cursorsToWait.empty()) {
iter = _notReadyQueue.erase(iter);
else {
if (stopRequested()) {
log() << "stopping range deleter worker" << endl;
nextTask = _taskQueue.front();
if (!_env->deleteRange(nextTask->ns,
&errMsg)) {
warning() << "Error encountered while trying to delete range: "
<< errMsg << endl;
scoped_lock sl(_queueMutex);
NSMinMax setEntry(nextTask->ns, nextTask->min, nextTask->max);
deletePtrElement(&_deleteSet, &setEntry);
if (nextTask->notifyDone) {
delete nextTask;
nextTask = NULL;
bool RangeDeleter::isBlacklisted_inlock(const StringData& ns,
const BSONObj& min,
const BSONObj& max,
std::string* errMsg) const {
for (NSMinMaxSet::const_iterator iter = _blackList.begin();
iter != _blackList.end(); ++iter) {
const NSMinMax* const entry = *iter;
if (ns != entry->ns) continue;
if (rangeOverlaps(min, max, entry->min, entry->max)) {
*errMsg = stream() << "ns: " << ns
<< ", min: " << min
<< ", max: " << max
<< " intersects with black list"
<< " min: " << entry->min
<< ", max: " << entry->max;
return true;
return false;
bool RangeDeleter::canEnqueue_inlock(const StringData& ns,
const BSONObj& min,
const BSONObj& max,
string* errMsg) const {
if (isBlacklisted_inlock(ns, min, max, errMsg)) {
return false;
NSMinMax toDelete(ns.toString(), min, max);
if (_deleteSet.count(&toDelete) > 0) {
*errMsg = stream() << "ns: " << ns
<< ", min: " << min
<< ", max: " << max
<< " is already being processed for deletion.";
return false;
return true;
bool RangeDeleter::stopRequested() const {
scoped_lock sl(_stopMutex);
return _stopRequested;