diff options
Diffstat (limited to 'src/mongo/db/range_deleter_db_env.cpp')
-rw-r--r-- | src/mongo/db/range_deleter_db_env.cpp | 163 |
1 files changed, 74 insertions, 89 deletions
diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp index 69f2cd86d33..b9971f54b03 100644 --- a/src/mongo/db/range_deleter_db_env.cpp +++ b/src/mongo/db/range_deleter_db_env.cpp @@ -46,102 +46,87 @@ namespace mongo { - using std::endl; - using std::string; - - /** - * Outline of the delete process: - * 1. Initialize the client for this thread if there is no client. This is for the worker - * threads that are attached to any of the threads servicing client requests. - * 2. Grant this thread authorization to perform deletes. - * 3. Temporarily enable mode to bypass shard version checks. TODO: Replace this hack. - * 4. Setup callback to save deletes to moveChunk directory (only if moveParanoia is true). - * 5. Delete range. - * 6. Wait until the majority of the secondaries catch up. - */ - bool RangeDeleterDBEnv::deleteRange(OperationContext* txn, - const RangeDeleteEntry& taskDetails, - long long int* deletedDocs, - std::string* errMsg) { - const string ns(taskDetails.options.range.ns); - const BSONObj inclusiveLower(taskDetails.options.range.minKey); - const BSONObj exclusiveUpper(taskDetails.options.range.maxKey); - const BSONObj keyPattern(taskDetails.options.range.keyPattern); - const WriteConcernOptions writeConcern(taskDetails.options.writeConcern); - const bool fromMigrate = taskDetails.options.fromMigrate; - const bool onlyRemoveOrphans = taskDetails.options.onlyRemoveOrphanedDocs; - - Client::initThreadIfNotAlready("RangeDeleter"); - - *deletedDocs = 0; - ShardForceVersionOkModeBlock forceVersion(txn->getClient()); - { - Helpers::RemoveSaver removeSaver("moveChunk", - ns, - taskDetails.options.removeSaverReason); - Helpers::RemoveSaver* removeSaverPtr = NULL; - if (serverGlobalParams.moveParanoia && - !taskDetails.options.removeSaverReason.empty()) { - removeSaverPtr = &removeSaver; - } +using std::endl; +using std::string; - // log the opId so the user can use it to cancel the delete using killOp. - unsigned int opId = txn->getOpID(); - log() << "Deleter starting delete for: " << ns - << " from " << inclusiveLower - << " -> " << exclusiveUpper - << ", with opId: " << opId - << endl; - - try { - *deletedDocs = - Helpers::removeRange(txn, - KeyRange(ns, - inclusiveLower, - exclusiveUpper, - keyPattern), - false, /*maxInclusive*/ - writeConcern, - removeSaverPtr, - fromMigrate, - onlyRemoveOrphans); - - if (*deletedDocs < 0) { - *errMsg = "collection or index dropped before data could be cleaned"; - warning() << *errMsg << endl; - - return false; - } - - log() << "rangeDeleter deleted " << *deletedDocs - << " documents for " << ns - << " from " << inclusiveLower - << " -> " << exclusiveUpper - << endl; - } - catch (const DBException& ex) { - *errMsg = str::stream() << "Error encountered while deleting range: " - << "ns" << ns - << " from " << inclusiveLower - << " -> " << exclusiveUpper - << ", cause by:" << causedBy(ex); +/** + * Outline of the delete process: + * 1. Initialize the client for this thread if there is no client. This is for the worker + * threads that are attached to any of the threads servicing client requests. + * 2. Grant this thread authorization to perform deletes. + * 3. Temporarily enable mode to bypass shard version checks. TODO: Replace this hack. + * 4. Setup callback to save deletes to moveChunk directory (only if moveParanoia is true). + * 5. Delete range. + * 6. Wait until the majority of the secondaries catch up. + */ +bool RangeDeleterDBEnv::deleteRange(OperationContext* txn, + const RangeDeleteEntry& taskDetails, + long long int* deletedDocs, + std::string* errMsg) { + const string ns(taskDetails.options.range.ns); + const BSONObj inclusiveLower(taskDetails.options.range.minKey); + const BSONObj exclusiveUpper(taskDetails.options.range.maxKey); + const BSONObj keyPattern(taskDetails.options.range.keyPattern); + const WriteConcernOptions writeConcern(taskDetails.options.writeConcern); + const bool fromMigrate = taskDetails.options.fromMigrate; + const bool onlyRemoveOrphans = taskDetails.options.onlyRemoveOrphanedDocs; + + Client::initThreadIfNotAlready("RangeDeleter"); + + *deletedDocs = 0; + ShardForceVersionOkModeBlock forceVersion(txn->getClient()); + { + Helpers::RemoveSaver removeSaver("moveChunk", ns, taskDetails.options.removeSaverReason); + Helpers::RemoveSaver* removeSaverPtr = NULL; + if (serverGlobalParams.moveParanoia && !taskDetails.options.removeSaverReason.empty()) { + removeSaverPtr = &removeSaver; + } + + // log the opId so the user can use it to cancel the delete using killOp. + unsigned int opId = txn->getOpID(); + log() << "Deleter starting delete for: " << ns << " from " << inclusiveLower << " -> " + << exclusiveUpper << ", with opId: " << opId << endl; + + try { + *deletedDocs = + Helpers::removeRange(txn, + KeyRange(ns, inclusiveLower, exclusiveUpper, keyPattern), + false, /*maxInclusive*/ + writeConcern, + removeSaverPtr, + fromMigrate, + onlyRemoveOrphans); + + if (*deletedDocs < 0) { + *errMsg = "collection or index dropped before data could be cleaned"; + warning() << *errMsg << endl; return false; } - } - return true; - } + log() << "rangeDeleter deleted " << *deletedDocs << " documents for " << ns << " from " + << inclusiveLower << " -> " << exclusiveUpper << endl; + } catch (const DBException& ex) { + *errMsg = str::stream() << "Error encountered while deleting range: " + << "ns" << ns << " from " << inclusiveLower << " -> " + << exclusiveUpper << ", cause by:" << causedBy(ex); - void RangeDeleterDBEnv::getCursorIds(OperationContext* txn, - StringData ns, - std::set<CursorId>* openCursors) { - AutoGetCollectionForRead ctx(txn, ns.toString()); - Collection* collection = ctx.getCollection(); - if (!collection) { - return; + return false; } + } + + return true; +} - collection->getCursorManager()->getCursorIds( openCursors ); +void RangeDeleterDBEnv::getCursorIds(OperationContext* txn, + StringData ns, + std::set<CursorId>* openCursors) { + AutoGetCollectionForRead ctx(txn, ns.toString()); + Collection* collection = ctx.getCollection(); + if (!collection) { + return; } + + collection->getCursorManager()->getCursorIds(openCursors); +} } |