/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include
#include
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/range_arithmetic.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/mutex.h"
#include "mongo/util/concurrency/notification.h"
#include "mongo/util/time_support.h"
namespace mongo {
class OperationContext;
struct DeleteJobStats;
struct RangeDeleteEntry;
struct RangeDeleterEnv;
struct RangeDeleterOptions;
/**
* Class for deleting documents for a given namespace and range. It contains a queue of
* jobs to be deleted. Deletions can be "immediate", in which case they are going to be put
* in front of the queue and acted on promptly, or "lazy", in which they would be acted
* upon when they get to the head of the queue.
*
* Threading assumptions:
*
* This class has (currently) one worker thread attacking the queue, one
* job at a time. If we want an immediate deletion, that job is going to
* be performed on the thread that is requesting it.
*
* All calls regarding deletion are synchronized.
*
* Life cycle:
* RangeDeleter* deleter = new RangeDeleter(new ...);
* deleter->startWorkers();
* ...
* getGlobalServiceContext()->killAllOperations(); // stop all deletes
* deleter->stopWorkers();
* delete deleter;
*/
class RangeDeleter {
MONGO_DISALLOW_COPYING(RangeDeleter);
public:
/**
* Creates a new deleter and uses an environment object to delegate external logic like
* data deletion. Takes ownership of the environment.
*/
explicit RangeDeleter(RangeDeleterEnv* env);
/**
* Destroys this deleter. Must make sure that no threads are working on this queue. Use
* stopWorkers to stop the internal workers, it is an error not to do so.
*/
~RangeDeleter();
//
// Thread management methods
//
/**
* Starts the background thread to work on this queue. Does nothing if the worker
* thread is already active.
*
* This call is _not_ thread safe and must be issued before any other call.
*/
void startWorkers();
/**
* Stops the background thread working on this queue. This will block if there are
* tasks that are being deleted, but will leave the pending tasks in the queue.
*
* Steps:
* 1. Stop accepting new queued deletes.
* 2. Stop all idle workers.
* 3. Waits for all threads to finish any task that is in progress (but see note
* below).
*
* Note:
*
* + restarting this deleter with startWorkers after stopping it is not supported.
*
* + the worker thread could be running a call in the environment. The thread is
* only going to be returned when the environment decides so. In production,
* KillCurrentOp::killAll can be used to get the thread back from the environment.
*/
void stopWorkers();
//
// Queue manipulation methods - can be called by anyone.
//
/**
* Adds a new delete to the queue.
*
* If notifyDone is not NULL, it will be signaled after the delete is completed.
* Note that this will happen only if the delete was actually queued.
*
* Returns true if the task is queued and false If the given range is blacklisted,
* is already queued, or stopWorkers() was called.
*/
bool queueDelete(OperationContext* opCtx,
const RangeDeleterOptions& options,
Notification* doneSignal,
std::string* errMsg);
/**
* Removes the documents specified by the range. Unlike queueTask, this call
* blocks and the deletion is performed by the current thread.
*
* Returns true if the deletion was performed. False if the range is blacklisted,
* was already queued, or stopWorkers() was called.
*/
bool deleteNow(OperationContext* opCtx,
const RangeDeleterOptions& options,
std::string* errMsg);
//
// Introspection methods
//
// Note: original contents of stats will be cleared. Caller owns the returned stats.
std::vector> getStatsHistory() const;
size_t getTotalDeletes() const;
size_t getPendingDeletes() const;
size_t getDeletesInProgress() const;
//
// Methods meant to be only used for testing. Should be treated like private
// methods.
//
/** Returns a BSON representation of the queue contents. For debugging only. */
BSONObj toBSON() const;
private:
// Ownership is transferred to here.
void recordDelStats(DeleteJobStats* newStat);
struct NSMinMax;
struct NSMinMaxCmp {
bool operator()(const NSMinMax* lhs, const NSMinMax* rhs) const;
};
typedef std::deque TaskList; // owned here
typedef std::set NSMinMaxSet; // owned here
/** Body of the worker thread */
void doWork();
/** Returns true if the range doesn't intersect with one other range */
bool canEnqueue_inlock(StringData ns,
const BSONObj& min,
const BSONObj& max,
std::string* errMsg) const;
/** Returns true if stopWorkers() was called. This call is synchronized. */
bool stopRequested() const;
std::unique_ptr _env;
// Initially not active. Must be started explicitly.
std::unique_ptr _worker;
// Protects _stopRequested.
mutable stdx::mutex _stopMutex;
// If set, no other delete taks should be accepted.
bool _stopRequested;
// No delete is in progress. Used to make sure that there is no activity
// in this deleter, and therefore is safe to destroy it. Must be used in
// conjunction with _stopRequested.
stdx::condition_variable _nothingInProgressCV;
// Protects all the data structure below this.
mutable stdx::mutex _queueMutex;
// _taskQueue has a task ready to work on.
stdx::condition_variable _taskQueueNotEmptyCV;
// Queue for storing the list of ranges that have cursors pending on it.
//
// Note: pointer life cycle is not handled here.
TaskList _notReadyQueue;
// Queue for storing the list of ranges that are ready to be removed.
//
// Note: pointer life cycle is not handled here.
TaskList _taskQueue;
// Set of all deletes - deletes waiting for cursors, waiting to be acted upon
// and in progress. Includes both queued and immediate deletes.
//
// queued delete life cycle: new @ queuedDelete, delete @ doWork
// deleteNow life cycle: deleteNow stack variable
NSMinMaxSet _deleteSet;
// Keeps track of number of tasks that are in progress, including the inline deletes.
size_t _deletesInProgress;
// Protects _statsHistory
mutable stdx::mutex _statsHistoryMutex;
std::deque _statsHistory;
};
/**
* Simple class for storing statistics for the RangeDeleter.
*/
struct DeleteJobStats {
Date_t queueStartTS;
Date_t queueEndTS;
Date_t deleteStartTS;
Date_t deleteEndTS;
Date_t waitForReplStartTS;
Date_t waitForReplEndTS;
long long int deletedDocCount;
DeleteJobStats() : deletedDocCount(0) {}
};
struct RangeDeleterOptions {
RangeDeleterOptions(const KeyRange& range);
const KeyRange range;
WriteConcernOptions writeConcern;
std::string removeSaverReason;
bool fromMigrate;
bool onlyRemoveOrphanedDocs;
bool waitForOpenCursors;
};
/**
* For internal use only.
*/
struct RangeDeleteEntry {
RangeDeleteEntry(const RangeDeleterOptions& options);
const RangeDeleterOptions options;
// Sets of cursors to wait to close until this can be ready
// for deletion.
std::set cursorsToWait;
// Not owned here.
// Important invariant: Can only be set and used by one thread.
Notification* doneSignal;
// Time since the last time we reported this object.
Date_t lastLoggedTS;
DeleteJobStats stats;
// For debugging only
BSONObj toBSON() const;
};
/**
* Class for encapsulating logic used by the RangeDeleter class to perform its tasks.
*/
struct RangeDeleterEnv {
virtual ~RangeDeleterEnv() {}
/**
* Deletes the documents from the given range. This method should be
* responsible for making sure that the proper contexts are setup
* to be able to perform deletions.
*
* Must be a synchronous call. Docs should be deleted after call ends.
* Must not throw Exceptions.
*/
virtual bool deleteRange(OperationContext* opCtx,
const RangeDeleteEntry& taskDetails,
long long int* deletedDocs,
std::string* errMsg) = 0;
/**
* Gets the list of open cursors on a given namespace. The openCursors is an
* output parameter that will contain all the cursors open after this is called.
* Assume that openCursors is empty when passed in.
*
* Must be a synchronous call. CursorIds should be populated after call.
* Must not throw exception.
*/
virtual void getCursorIds(OperationContext* opCtx,
StringData ns,
std::set* openCursors) = 0;
};
} // namespace mongo