/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/db/range_deleter_db_env.h"
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/write_concern.h"
#include "mongo/s/d_logic.h"
namespace mongo {
void RangeDeleterDBEnv::initThread() {
if ( currentClient.get() == NULL )
Client::initThread( "RangeDeleter" );
}
/**
* Outline of the delete process:
* 1. Initialize the client for this thread if there is no client. This is for the worker
* threads that are attached to any of the threads servicing client requests.
* 2. Grant this thread authorization to perform deletes.
* 3. Temporarily enable mode to bypass shard version checks. TODO: Replace this hack.
* 4. Setup callback to save deletes to moveChunk directory (only if moveParanoia is true).
* 5. Delete range.
* 6. Wait until the majority of the secondaries catch up.
*/
bool RangeDeleterDBEnv::deleteRange(OperationContext* txn,
const StringData& ns,
const BSONObj& inclusiveLower,
const BSONObj& exclusiveUpper,
const BSONObj& keyPattern,
bool secondaryThrottle,
std::string* errMsg) {
const bool initiallyHaveClient = haveClient();
if (!initiallyHaveClient) {
Client::initThread("RangeDeleter");
}
ShardForceVersionOkModeBlock forceVersion;
{
Helpers::RemoveSaver removeSaver("moveChunk", ns.toString(), "post-cleanup");
// log the opId so the user can use it to cancel the delete using killOp.
unsigned int opId = cc().curop()->opNum();
log() << "Deleter starting delete for: " << ns
<< " from " << inclusiveLower
<< " -> " << exclusiveUpper
<< ", with opId: " << opId
<< endl;
try {
long long numDeleted =
Helpers::removeRange(txn,
KeyRange(ns.toString(),
inclusiveLower,
exclusiveUpper,
keyPattern),
false, /*maxInclusive*/
replSet? secondaryThrottle : false,
serverGlobalParams.moveParanoia ? &removeSaver : NULL,
true, /*fromMigrate*/
true); /*onlyRemoveOrphans*/
if (numDeleted < 0) {
*errMsg = "collection or index dropped before data could be cleaned";
warning() << *errMsg << endl;
if (!initiallyHaveClient) {
cc().shutdown();
}
return false;
}
log() << "rangeDeleter deleted " << numDeleted
<< " documents for " << ns
<< " from " << inclusiveLower
<< " -> " << exclusiveUpper
<< endl;
}
catch (const DBException& ex) {
*errMsg = str::stream() << "Error encountered while deleting range: "
<< "ns" << ns
<< " from " << inclusiveLower
<< " -> " << exclusiveUpper
<< ", cause by:" << causedBy(ex);
if (!initiallyHaveClient) {
cc().shutdown();
}
return false;
}
}
if (replSet) {
Timer elapsedTime;
ReplTime lastOpApplied = cc().getLastOp().asDate();
while (!opReplicatedEnough(lastOpApplied,
BSON("w" << "majority").firstElement())) {
if (elapsedTime.seconds() >= 3600) {
*errMsg = str::stream() << "rangeDeleter timed out after "
<< elapsedTime.seconds() << " seconds while waiting"
<< " for deletions to be replicated to majority nodes";
if (!initiallyHaveClient) {
cc().shutdown();
}
return false;
}
sleepsecs(1);
}
LOG(elapsedTime.seconds() < 30 ? 1 : 0)
<< "rangeDeleter took " << elapsedTime.seconds() << " seconds "
<< " waiting for deletes to be replicated to majority nodes" << endl;
}
if (!initiallyHaveClient) {
cc().shutdown();
}
return true;
}
void RangeDeleterDBEnv::getCursorIds(const StringData& ns,
std::set* openCursors) {
Client::ReadContext ctx(ns.toString());
Collection* collection = ctx.ctx().db()->getCollection( ns );
if ( !collection )
return;
collection->cursorCache()->getCursorIds( openCursors );
}
}