/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/db/range_deleter.h"
#include
#include
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/util/concurrency/synchronization.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
using std::auto_ptr;
using std::set;
using std::pair;
using std::string;
namespace {
const long int kNotEmptyTimeoutMillis = 200;
const long long int kMaxCursorCheckIntervalMillis = 500;
const size_t kDeleteJobsHistory = 10; // entries
/**
* Removes an element from the container that holds a pointer type, and deletes the
* pointer as well. Returns true if the element was found.
*/
template
bool deletePtrElement(ContainerType* container, ContainerElementType elem) {
typename ContainerType::iterator iter = container->find(elem);
if (iter == container->end()) {
return false;
}
delete *iter;
container->erase(iter);
return true;
}
}
namespace mongo {
namespace duration = boost::posix_time;
static void logCursorsWaiting(RangeDeleteEntry* entry) {
// We always log the first cursors waiting message (so we have cursor ids in the logs).
// After 15 minutes (the cursor timeout period), we start logging additional messages at
// a 1 minute interval.
static const long long kLogCursorsThresholdMillis = 15 * 60 * 1000;
static const long long kLogCursorsIntervalMillis = 1 * 60 * 1000;
Date_t currentTime = jsTime();
long long elapsedMillisSinceQueued = 0;
// We always log the first message when lastLoggedTime == 0
if (entry->lastLoggedTS != 0) {
if (currentTime > entry->stats.queueStartTS)
elapsedMillisSinceQueued = currentTime - entry->stats.queueStartTS;
// Not logging, threshold not passed
if (elapsedMillisSinceQueued < kLogCursorsThresholdMillis)
return;
long long elapsedMillisSinceLog = 0;
if (currentTime > entry->lastLoggedTS)
elapsedMillisSinceLog = currentTime - entry->lastLoggedTS;
// Not logging, logged a short time ago
if (elapsedMillisSinceLog < kLogCursorsIntervalMillis)
return;
}
str::stream cursorList;
for (std::set::const_iterator it = entry->cursorsToWait.begin();
it != entry->cursorsToWait.end(); ++it) {
if (it != entry->cursorsToWait.begin())
cursorList << ", ";
cursorList << *it;
}
log() << "waiting for open cursors before removing range "
<< "[" << entry->options.range.minKey << ", " << entry->options.range.maxKey << ") "
<< "in " << entry->options.range.ns
<< (entry->lastLoggedTS == 0 ?
string("") :
string(str::stream() << ", elapsed secs: " << elapsedMillisSinceQueued / 1000))
<< ", cursor ids: [" << string(cursorList) << "]";
entry->lastLoggedTS = currentTime;
}
struct RangeDeleter::NSMinMax {
NSMinMax(std::string ns, const BSONObj min, const BSONObj max):
ns(ns), min(min), max(max) {
}
std::string ns;
// Inclusive lower range.
BSONObj min;
// Exclusive upper range.
BSONObj max;
};
bool RangeDeleter::NSMinMaxCmp::operator()(
const NSMinMax* lhs, const NSMinMax* rhs) const {
const int nsComp = lhs->ns.compare(rhs->ns);
if (nsComp < 0) {
return true;
}
if (nsComp > 0) {
return false;
}
return compareRanges(lhs->min, lhs->max, rhs->min, rhs->max) < 0;
}
RangeDeleter::RangeDeleter(RangeDeleterEnv* env):
_env(env), // ownership xfer
_stopMutex("stopRangeDeleter"),
_stopRequested(false),
_queueMutex("RangeDeleter"),
_deletesInProgress(0),
_statsHistoryMutex("RangeDeleterStatsHistory") {
}
RangeDeleter::~RangeDeleter() {
for(TaskList::iterator it = _notReadyQueue.begin();
it != _notReadyQueue.end();
++it) {
delete (*it);
}
for(TaskList::iterator it = _taskQueue.begin();
it != _taskQueue.end();
++it) {
delete (*it);
}
for(NSMinMaxSet::iterator it = _deleteSet.begin();
it != _deleteSet.end();
++it) {
delete (*it);
}
for(std::deque::iterator it = _statsHistory.begin();
it != _statsHistory.end();
++it) {
delete (*it);
}
}
void RangeDeleter::startWorkers() {
if (!_worker) {
_worker.reset(new boost::thread(stdx::bind(&RangeDeleter::doWork, this)));
}
}
void RangeDeleter::stopWorkers() {
{
scoped_lock sl(_stopMutex);
_stopRequested = true;
}
if (_worker) {
_worker->join();
}
scoped_lock sl(_queueMutex);
while (_deletesInProgress > 0) {
_nothingInProgressCV.wait(sl.boost());
}
}
bool RangeDeleter::queueDelete(OperationContext* txn,
const RangeDeleterOptions& options,
Notification* notifyDone,
std::string* errMsg) {
string dummy;
if (errMsg == NULL) errMsg = &dummy;
const string& ns(options.range.ns);
const BSONObj& min(options.range.minKey);
const BSONObj& max(options.range.maxKey);
auto_ptr toDelete(
new RangeDeleteEntry(options));
toDelete->notifyDone = notifyDone;
{
scoped_lock sl(_queueMutex);
if (_stopRequested) {
*errMsg = "deleter is already stopped.";
return false;
}
if (!canEnqueue_inlock(ns, min, max, errMsg)) {
return false;
}
_deleteSet.insert(new NSMinMax(ns, min.getOwned(), max.getOwned()));
}
if (options.waitForOpenCursors) {
_env->getCursorIds(txn, ns, &toDelete->cursorsToWait);
}
toDelete->stats.queueStartTS = jsTime();
if (!toDelete->cursorsToWait.empty())
logCursorsWaiting(toDelete.get());
{
scoped_lock sl(_queueMutex);
if (toDelete->cursorsToWait.empty()) {
toDelete->stats.queueEndTS = jsTime();
_taskQueue.push_back(toDelete.release());
_taskQueueNotEmptyCV.notify_one();
}
else {
_notReadyQueue.push_back(toDelete.release());
}
}
return true;
}
namespace {
const int kWTimeoutMillis = 60 * 60 * 1000;
bool _waitForMajority(OperationContext* txn, std::string* errMsg) {
const WriteConcernOptions writeConcern("majority",
WriteConcernOptions::NONE,
kWTimeoutMillis);
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::getGlobalReplicationCoordinator()->awaitReplicationOfLastOpForClient(
txn, writeConcern);
repl::ReplicationCoordinator::Milliseconds elapsedTime = replStatus.duration;
if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) {
*errMsg = str::stream() << "rangeDeleter timed out after "
<< elapsedTime.seconds() << " seconds while waiting"
<< " for deletions to be replicated to majority nodes";
log() << *errMsg;
}
else if (replStatus.status.code() == ErrorCodes::NotMaster) {
*errMsg = str::stream() << "rangeDeleter no longer PRIMARY after "
<< elapsedTime.seconds() << " seconds while waiting"
<< " for deletions to be replicated to majority nodes";
}
else {
LOG(elapsedTime.seconds() < 30 ? 1 : 0)
<< "rangeDeleter took " << elapsedTime.seconds() << " seconds "
<< " waiting for deletes to be replicated to majority nodes";
fassert(18512, replStatus.status);
}
return replStatus.status.isOK();
}
}
bool RangeDeleter::deleteNow(OperationContext* txn,
const RangeDeleterOptions& options,
string* errMsg) {
if (stopRequested()) {
*errMsg = "deleter is already stopped.";
return false;
}
string dummy;
if (errMsg == NULL) errMsg = &dummy;
const string& ns(options.range.ns);
const BSONObj& min(options.range.minKey);
const BSONObj& max(options.range.maxKey);
NSMinMax deleteRange(ns, min, max);
{
scoped_lock sl(_queueMutex);
if (!canEnqueue_inlock(ns, min, max, errMsg)) {
return false;
}
_deleteSet.insert(&deleteRange);
// Note: count for pending deletes is an integral part of the shutdown story.
// Therefore, to simplify things, there is no "pending" state for deletes in
// deleteNow, the state transition is simply inProgress -> done.
_deletesInProgress++;
}
set cursorsToWait;
if (options.waitForOpenCursors) {
_env->getCursorIds(txn, ns, &cursorsToWait);
}
long long checkIntervalMillis = 5;
RangeDeleteEntry taskDetails(options);
taskDetails.stats.queueStartTS = jsTime();
for (; !cursorsToWait.empty(); sleepmillis(checkIntervalMillis)) {
logCursorsWaiting(&taskDetails);
set cursorsNow;
_env->getCursorIds(txn, ns, &cursorsNow);
set cursorsLeft;
std::set_intersection(cursorsToWait.begin(),
cursorsToWait.end(),
cursorsNow.begin(),
cursorsNow.end(),
std::inserter(cursorsLeft, cursorsLeft.end()));
cursorsToWait.swap(cursorsLeft);
if (stopRequested()) {
*errMsg = "deleter was stopped.";
scoped_lock sl(_queueMutex);
_deleteSet.erase(&deleteRange);
_deletesInProgress--;
if (_deletesInProgress == 0) {
_nothingInProgressCV.notify_one();
}
return false;
}
if (checkIntervalMillis < kMaxCursorCheckIntervalMillis) {
checkIntervalMillis *= 2;
}
}
taskDetails.stats.queueEndTS = jsTime();
taskDetails.stats.deleteStartTS = jsTime();
bool result = _env->deleteRange(txn,
taskDetails,
&taskDetails.stats.deletedDocCount,
errMsg);
taskDetails.stats.deleteEndTS = jsTime();
if (result) {
taskDetails.stats.waitForReplStartTS = jsTime();
result = _waitForMajority(txn, errMsg);
taskDetails.stats.waitForReplEndTS = jsTime();
}
{
scoped_lock sl(_queueMutex);
_deleteSet.erase(&deleteRange);
_deletesInProgress--;
if (_deletesInProgress == 0) {
_nothingInProgressCV.notify_one();
}
}
recordDelStats(new DeleteJobStats(taskDetails.stats));
return result;
}
void RangeDeleter::getStatsHistory(std::vector* stats) const {
stats->clear();
stats->reserve(kDeleteJobsHistory);
scoped_lock sl(_statsHistoryMutex);
for (deque::const_iterator it = _statsHistory.begin();
it != _statsHistory.end(); ++it) {
stats->push_back(new DeleteJobStats(**it));
}
}
BSONObj RangeDeleter::toBSON() const {
scoped_lock sl(_queueMutex);
BSONObjBuilder builder;
BSONArrayBuilder notReadyBuilder(builder.subarrayStart("notReady"));
for (TaskList::const_iterator iter = _notReadyQueue.begin();
iter != _notReadyQueue.end(); ++iter) {
notReadyBuilder.append((*iter)->toBSON());
}
notReadyBuilder.doneFast();
BSONArrayBuilder readyBuilder(builder.subarrayStart("ready"));
for (TaskList::const_iterator iter = _taskQueue.begin();
iter != _taskQueue.end(); ++iter) {
readyBuilder.append((*iter)->toBSON());
}
readyBuilder.doneFast();
return builder.obj();
}
void RangeDeleter::doWork() {
_env->initThread();
while (!inShutdown() && !stopRequested()) {
string errMsg;
boost::scoped_ptr txn(getGlobalEnvironment()->newOpCtx());
RangeDeleteEntry* nextTask = NULL;
{
scoped_lock sl(_queueMutex);
while (_taskQueue.empty()) {
_taskQueueNotEmptyCV.timed_wait(
sl.boost(), duration::milliseconds(kNotEmptyTimeoutMillis));
if (stopRequested()) {
log() << "stopping range deleter worker" << endl;
return;
}
if (_taskQueue.empty()) {
// Try to check if some deletes are ready and move them to the
// ready queue.
TaskList::iterator iter = _notReadyQueue.begin();
while (iter != _notReadyQueue.end()) {
RangeDeleteEntry* entry = *iter;
set cursorsNow;
{
if (entry->options.waitForOpenCursors) {
_env->getCursorIds(txn.get(),
entry->options.range.ns,
&cursorsNow);
}
}
set cursorsLeft;
std::set_intersection(entry->cursorsToWait.begin(),
entry->cursorsToWait.end(),
cursorsNow.begin(),
cursorsNow.end(),
std::inserter(cursorsLeft,
cursorsLeft.end()));
entry->cursorsToWait.swap(cursorsLeft);
if (entry->cursorsToWait.empty()) {
(*iter)->stats.queueEndTS = jsTime();
_taskQueue.push_back(*iter);
_taskQueueNotEmptyCV.notify_one();
iter = _notReadyQueue.erase(iter);
}
else {
logCursorsWaiting(entry);
++iter;
}
}
}
}
if (stopRequested()) {
log() << "stopping range deleter worker" << endl;
return;
}
nextTask = _taskQueue.front();
_taskQueue.pop_front();
_deletesInProgress++;
}
{
nextTask->stats.deleteStartTS = jsTime();
bool delResult = _env->deleteRange(txn.get(),
*nextTask,
&nextTask->stats.deletedDocCount,
&errMsg);
nextTask->stats.deleteEndTS = jsTime();
if (delResult) {
nextTask->stats.waitForReplStartTS = jsTime();
if (!_waitForMajority(txn.get(), &errMsg)) {
warning() << "Error encountered while waiting for replication: " << errMsg;
}
nextTask->stats.waitForReplEndTS = jsTime();
}
else {
warning() << "Error encountered while trying to delete range: "
<< errMsg << endl;
}
}
{
scoped_lock sl(_queueMutex);
NSMinMax setEntry(nextTask->options.range.ns,
nextTask->options.range.minKey,
nextTask->options.range.maxKey);
deletePtrElement(&_deleteSet, &setEntry);
_deletesInProgress--;
if (nextTask->notifyDone) {
nextTask->notifyDone->notifyOne();
}
}
recordDelStats(new DeleteJobStats(nextTask->stats));
delete nextTask;
nextTask = NULL;
}
}
bool RangeDeleter::canEnqueue_inlock(const StringData& ns,
const BSONObj& min,
const BSONObj& max,
string* errMsg) const {
NSMinMax toDelete(ns.toString(), min, max);
if (_deleteSet.count(&toDelete) > 0) {
*errMsg = str::stream() << "ns: " << ns << ", min: " << min << ", max: " << max
<< " is already being processed for deletion.";
return false;
}
return true;
}
bool RangeDeleter::stopRequested() const {
scoped_lock sl(_stopMutex);
return _stopRequested;
}
size_t RangeDeleter::getTotalDeletes() const {
scoped_lock sl(_queueMutex);
return _deleteSet.size();
}
size_t RangeDeleter::getPendingDeletes() const {
scoped_lock sl(_queueMutex);
return _notReadyQueue.size() + _taskQueue.size();
}
size_t RangeDeleter::getDeletesInProgress() const {
scoped_lock sl(_queueMutex);
return _deletesInProgress;
}
void RangeDeleter::recordDelStats(DeleteJobStats* newStat) {
scoped_lock sl(_statsHistoryMutex);
if (_statsHistory.size() == kDeleteJobsHistory) {
delete _statsHistory.front();
_statsHistory.pop_front();
}
_statsHistory.push_back(newStat);
}
RangeDeleteEntry::RangeDeleteEntry(const RangeDeleterOptions& options)
: options(options),
notifyDone(NULL),
lastLoggedTS(0) {
}
BSONObj RangeDeleteEntry::toBSON() const {
BSONObjBuilder builder;
builder.append("ns", options.range.ns);
builder.append("min", options.range.minKey);
builder.append("max", options.range.maxKey);
BSONArrayBuilder cursorBuilder(builder.subarrayStart("cursors"));
for (std::set::const_iterator it = cursorsToWait.begin();
it != cursorsToWait.end(); ++it) {
cursorBuilder.append((long long)*it);
}
cursorBuilder.doneFast();
return builder.done().copy();
}
RangeDeleterOptions::RangeDeleterOptions(const KeyRange& range)
: range(range),
fromMigrate(false),
onlyRemoveOrphanedDocs(false),
waitForOpenCursors(false) {
}
}