summaryrefslogtreecommitdiff
path: root/src/mongo/util/background.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/background.cpp')
-rw-r--r--src/mongo/util/background.cpp493
1 files changed, 241 insertions, 252 deletions
diff --git a/src/mongo/util/background.cpp b/src/mongo/util/background.cpp
index 52147e923aa..a98d92a0a41 100644
--- a/src/mongo/util/background.cpp
+++ b/src/mongo/util/background.cpp
@@ -52,317 +52,306 @@
using namespace std;
namespace mongo {
- namespace {
+namespace {
- class PeriodicTaskRunner : public BackgroundJob {
- public:
+class PeriodicTaskRunner : public BackgroundJob {
+public:
+ PeriodicTaskRunner() : _shutdownRequested(false) {}
- PeriodicTaskRunner() : _shutdownRequested(false) {}
+ void add(PeriodicTask* task);
+ void remove(PeriodicTask* task);
- void add( PeriodicTask* task );
- void remove( PeriodicTask* task );
+ Status stop(int gracePeriodMillis);
- Status stop( int gracePeriodMillis );
-
- private:
-
- virtual std::string name() const {
- return "PeriodicTaskRunner";
- }
-
- virtual void run();
-
- // Returns true if shutdown has been requested. You must hold _mutex to call this
- // function.
- bool _isShutdownRequested() const;
-
- // Runs all registered tasks. You must hold _mutex to call this function.
- void _runTasks();
-
- // Runs one task to completion, and optionally reports timing. You must hold _mutex
- // to call this function.
- void _runTask( PeriodicTask* task );
-
- // _mutex protects the _shutdownRequested flag and the _tasks vector.
- std::mutex _mutex;
-
- // The condition variable is used to sleep for the interval between task
- // executions, and is notified when the _shutdownRequested flag is toggled.
- std::condition_variable _cond;
+private:
+ virtual std::string name() const {
+ return "PeriodicTaskRunner";
+ }
- // Used to break the loop. You should notify _cond after changing this to true
- // so that shutdown proceeds promptly.
- bool _shutdownRequested;
+ virtual void run();
+
+ // Returns true if shutdown has been requested. You must hold _mutex to call this
+ // function.
+ bool _isShutdownRequested() const;
+
+ // Runs all registered tasks. You must hold _mutex to call this function.
+ void _runTasks();
+
+ // Runs one task to completion, and optionally reports timing. You must hold _mutex
+ // to call this function.
+ void _runTask(PeriodicTask* task);
+
+ // _mutex protects the _shutdownRequested flag and the _tasks vector.
+ std::mutex _mutex;
+
+ // The condition variable is used to sleep for the interval between task
+ // executions, and is notified when the _shutdownRequested flag is toggled.
+ std::condition_variable _cond;
+
+ // Used to break the loop. You should notify _cond after changing this to true
+ // so that shutdown proceeds promptly.
+ bool _shutdownRequested;
+
+ // The PeriodicTasks contained in this vector are NOT owned by the
+ // PeriodicTaskRunner, and are not deleted. The vector never shrinks, removed Tasks
+ // have their entry overwritten with NULL.
+ std::vector<PeriodicTask*> _tasks;
+};
+
+// We rely here on zero-initialization of 'runnerMutex' to distinguish whether we are
+// running before or after static initialization for this translation unit has
+// completed. In the former case, we assume no threads are present, so we do not need
+// to use the mutex. When present, the mutex protects 'runner' and 'runnerDestroyed'
+// below.
+SimpleMutex* const runnerMutex = new SimpleMutex;
+
+// A scoped lock like object that only locks/unlocks the mutex if it exists.
+class ConditionalScopedLock {
+public:
+ ConditionalScopedLock(SimpleMutex* mutex) : _mutex(mutex) {
+ if (_mutex)
+ _mutex->lock();
+ }
+ ~ConditionalScopedLock() {
+ if (_mutex)
+ _mutex->unlock();
+ }
- // The PeriodicTasks contained in this vector are NOT owned by the
- // PeriodicTaskRunner, and are not deleted. The vector never shrinks, removed Tasks
- // have their entry overwritten with NULL.
- std::vector< PeriodicTask* > _tasks;
- };
+private:
+ SimpleMutex* const _mutex;
+};
- // We rely here on zero-initialization of 'runnerMutex' to distinguish whether we are
- // running before or after static initialization for this translation unit has
- // completed. In the former case, we assume no threads are present, so we do not need
- // to use the mutex. When present, the mutex protects 'runner' and 'runnerDestroyed'
- // below.
- SimpleMutex* const runnerMutex = new SimpleMutex;
+// The unique PeriodicTaskRunner, also zero-initialized.
+PeriodicTaskRunner* runner;
- // A scoped lock like object that only locks/unlocks the mutex if it exists.
- class ConditionalScopedLock {
- public:
- ConditionalScopedLock( SimpleMutex* mutex ) : _mutex( mutex ) {
- if ( _mutex )
- _mutex->lock();
- }
- ~ConditionalScopedLock() {
- if ( _mutex )
- _mutex->unlock();
- }
- private:
- SimpleMutex* const _mutex;
- };
+// The runner is never re-created once it has been destroyed.
+bool runnerDestroyed;
- // The unique PeriodicTaskRunner, also zero-initialized.
- PeriodicTaskRunner* runner;
+} // namespace
- // The runner is never re-created once it has been destroyed.
- bool runnerDestroyed;
+// both the BackgroundJob and the internal thread point to JobStatus
+struct BackgroundJob::JobStatus {
+ JobStatus() : state(NotStarted) {}
- } // namespace
+ std::mutex mutex;
+ std::condition_variable done;
+ State state;
+};
- // both the BackgroundJob and the internal thread point to JobStatus
- struct BackgroundJob::JobStatus {
- JobStatus() : state(NotStarted) {}
+BackgroundJob::BackgroundJob(bool selfDelete) : _selfDelete(selfDelete), _status(new JobStatus) {}
- std::mutex mutex;
- std::condition_variable done;
- State state;
- };
+BackgroundJob::~BackgroundJob() {}
- BackgroundJob::BackgroundJob( bool selfDelete )
- : _selfDelete( selfDelete )
- , _status( new JobStatus ) {
+void BackgroundJob::jobBody() {
+ const string threadName = name();
+ if (!threadName.empty()) {
+ setThreadName(threadName.c_str());
}
- BackgroundJob::~BackgroundJob() {}
-
- void BackgroundJob::jobBody() {
+ LOG(1) << "BackgroundJob starting: " << threadName << endl;
- const string threadName = name();
- if (!threadName.empty()) {
- setThreadName(threadName.c_str());
- }
-
- LOG(1) << "BackgroundJob starting: " << threadName << endl;
-
- try {
- run();
- }
- catch (const std::exception& e) {
- error() << "backgroundjob " << threadName << " exception: " << e.what();
- throw;
- }
+ try {
+ run();
+ } catch (const std::exception& e) {
+ error() << "backgroundjob " << threadName << " exception: " << e.what();
+ throw;
+ }
- // We must cache this value so that we can use it after we leave the following scope.
- const bool selfDelete = _selfDelete;
+ // We must cache this value so that we can use it after we leave the following scope.
+ const bool selfDelete = _selfDelete;
#ifdef MONGO_CONFIG_SSL
- // TODO(sverch): Allow people who use the BackgroundJob to also specify cleanup tasks.
- // Currently the networking code depends on this class and this class depends on the
- // networking code because of this ad hoc cleanup.
- SSLManagerInterface* manager = getSSLManager();
- if (manager)
- manager->cleanupThreadLocals();
+ // TODO(sverch): Allow people who use the BackgroundJob to also specify cleanup tasks.
+ // Currently the networking code depends on this class and this class depends on the
+ // networking code because of this ad hoc cleanup.
+ SSLManagerInterface* manager = getSSLManager();
+ if (manager)
+ manager->cleanupThreadLocals();
#endif
- {
- // It is illegal to access any state owned by this BackgroundJob after leaving this
- // scope, with the exception of the call to 'delete this' below.
- std::unique_lock<std::mutex> l( _status->mutex );
- _status->state = Done;
- _status->done.notify_all();
- }
-
- if( selfDelete )
- delete this;
+ {
+ // It is illegal to access any state owned by this BackgroundJob after leaving this
+ // scope, with the exception of the call to 'delete this' below.
+ std::unique_lock<std::mutex> l(_status->mutex);
+ _status->state = Done;
+ _status->done.notify_all();
}
- void BackgroundJob::go() {
- std::unique_lock<std::mutex> l( _status->mutex );
- massert( 17234, mongoutils::str::stream()
- << "backgroundJob already running: " << name(),
- _status->state != Running );
-
- // If the job is already 'done', for instance because it was cancelled or already
- // finished, ignore additional requests to run the job.
- if (_status->state == NotStarted) {
- std::thread t( std::bind( &BackgroundJob::jobBody , this ) );
- t.detach();
- _status->state = Running;
- }
+ if (selfDelete)
+ delete this;
+}
+
+void BackgroundJob::go() {
+ std::unique_lock<std::mutex> l(_status->mutex);
+ massert(17234,
+ mongoutils::str::stream() << "backgroundJob already running: " << name(),
+ _status->state != Running);
+
+ // If the job is already 'done', for instance because it was cancelled or already
+ // finished, ignore additional requests to run the job.
+ if (_status->state == NotStarted) {
+ std::thread t(std::bind(&BackgroundJob::jobBody, this));
+ t.detach();
+ _status->state = Running;
}
+}
- Status BackgroundJob::cancel() {
- std::unique_lock<std::mutex> l( _status->mutex );
+Status BackgroundJob::cancel() {
+ std::unique_lock<std::mutex> l(_status->mutex);
- if ( _status->state == Running )
- return Status( ErrorCodes::IllegalOperation,
- "Cannot cancel a running BackgroundJob" );
-
- if ( _status->state == NotStarted ) {
- _status->state = Done;
- _status->done.notify_all();
- }
+ if (_status->state == Running)
+ return Status(ErrorCodes::IllegalOperation, "Cannot cancel a running BackgroundJob");
- return Status::OK();
+ if (_status->state == NotStarted) {
+ _status->state = Done;
+ _status->done.notify_all();
}
- bool BackgroundJob::wait( unsigned msTimeOut ) {
- verify( !_selfDelete ); // you cannot call wait on a self-deleting job
- const auto deadline =
- std::chrono::system_clock::now() + std::chrono::milliseconds(msTimeOut);
- std::unique_lock<std::mutex> l( _status->mutex );
- while ( _status->state != Done ) {
- if ( msTimeOut ) {
- if (std::cv_status::timeout == _status->done.wait_until(l, deadline))
- return false;
- }
- else {
- _status->done.wait(l);
- }
+ return Status::OK();
+}
+
+bool BackgroundJob::wait(unsigned msTimeOut) {
+ verify(!_selfDelete); // you cannot call wait on a self-deleting job
+ const auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(msTimeOut);
+ std::unique_lock<std::mutex> l(_status->mutex);
+ while (_status->state != Done) {
+ if (msTimeOut) {
+ if (std::cv_status::timeout == _status->done.wait_until(l, deadline))
+ return false;
+ } else {
+ _status->done.wait(l);
}
- return true;
}
+ return true;
+}
- BackgroundJob::State BackgroundJob::getState() const {
- std::unique_lock<std::mutex> l( _status->mutex );
- return _status->state;
- }
-
- bool BackgroundJob::running() const {
- std::unique_lock<std::mutex> l( _status->mutex );
- return _status->state == Running;
- }
+BackgroundJob::State BackgroundJob::getState() const {
+ std::unique_lock<std::mutex> l(_status->mutex);
+ return _status->state;
+}
- // -------------------------
+bool BackgroundJob::running() const {
+ std::unique_lock<std::mutex> l(_status->mutex);
+ return _status->state == Running;
+}
- PeriodicTask::PeriodicTask() {
- ConditionalScopedLock lock( runnerMutex );
- if ( runnerDestroyed )
- return;
-
- if ( !runner )
- runner = new PeriodicTaskRunner;
-
- runner->add( this );
- }
+// -------------------------
- PeriodicTask::~PeriodicTask() {
- ConditionalScopedLock lock( runnerMutex );
- if ( runnerDestroyed || !runner )
- return;
+PeriodicTask::PeriodicTask() {
+ ConditionalScopedLock lock(runnerMutex);
+ if (runnerDestroyed)
+ return;
- runner->remove( this );
- }
+ if (!runner)
+ runner = new PeriodicTaskRunner;
- void PeriodicTask::startRunningPeriodicTasks() {
- ConditionalScopedLock lock( runnerMutex );
- if ( runnerDestroyed )
- return;
+ runner->add(this);
+}
- if ( !runner )
- runner = new PeriodicTaskRunner;
+PeriodicTask::~PeriodicTask() {
+ ConditionalScopedLock lock(runnerMutex);
+ if (runnerDestroyed || !runner)
+ return;
- runner->go();
- }
+ runner->remove(this);
+}
- Status PeriodicTask::stopRunningPeriodicTasks( int gracePeriodMillis ) {
- ConditionalScopedLock lock( runnerMutex );
+void PeriodicTask::startRunningPeriodicTasks() {
+ ConditionalScopedLock lock(runnerMutex);
+ if (runnerDestroyed)
+ return;
- Status status = Status::OK();
- if ( runnerDestroyed || !runner )
- return status;
+ if (!runner)
+ runner = new PeriodicTaskRunner;
- runner->cancel();
- status = runner->stop( gracePeriodMillis );
+ runner->go();
+}
- if ( status.isOK() ) {
- delete runner;
- runnerDestroyed = true;
- }
+Status PeriodicTask::stopRunningPeriodicTasks(int gracePeriodMillis) {
+ ConditionalScopedLock lock(runnerMutex);
+ Status status = Status::OK();
+ if (runnerDestroyed || !runner)
return status;
- }
- void PeriodicTaskRunner::add( PeriodicTask* task ) {
- std::lock_guard<std::mutex> lock( _mutex );
- _tasks.push_back( task );
- }
+ runner->cancel();
+ status = runner->stop(gracePeriodMillis);
- void PeriodicTaskRunner::remove( PeriodicTask* task ) {
- std::lock_guard<std::mutex> lock( _mutex );
- for ( size_t i = 0; i != _tasks.size(); i++ ) {
- if ( _tasks[i] == task ) {
- _tasks[i] = NULL;
- break;
- }
- }
+ if (status.isOK()) {
+ delete runner;
+ runnerDestroyed = true;
}
- Status PeriodicTaskRunner::stop( int gracePeriodMillis ) {
- {
- std::lock_guard<std::mutex> lock( _mutex );
- _shutdownRequested = true;
- _cond.notify_one();
- }
-
- if ( !wait( gracePeriodMillis ) ) {
- return Status( ErrorCodes::ExceededTimeLimit,
- "Grace period expired while waiting for PeriodicTasks to terminate" );
- }
- return Status::OK();
- }
+ return status;
+}
- void PeriodicTaskRunner::run() {
- // Use a shorter cycle time in debug mode to help catch race conditions.
- const std::chrono::seconds waitTime(kDebugBuild ? 5 : 60);
+void PeriodicTaskRunner::add(PeriodicTask* task) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _tasks.push_back(task);
+}
- std::unique_lock<std::mutex> lock(_mutex);
- while (!_shutdownRequested) {
- if (std::cv_status::timeout == _cond.wait_for(lock, waitTime))
- _runTasks();
+void PeriodicTaskRunner::remove(PeriodicTask* task) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ for (size_t i = 0; i != _tasks.size(); i++) {
+ if (_tasks[i] == task) {
+ _tasks[i] = NULL;
+ break;
}
}
+}
- bool PeriodicTaskRunner::_isShutdownRequested() const {
- return _shutdownRequested;
+Status PeriodicTaskRunner::stop(int gracePeriodMillis) {
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _shutdownRequested = true;
+ _cond.notify_one();
}
- void PeriodicTaskRunner::_runTasks() {
- const size_t size = _tasks.size();
- for ( size_t i = 0; i != size; ++i )
- if ( PeriodicTask* const task = _tasks[i] )
- _runTask(task);
+ if (!wait(gracePeriodMillis)) {
+ return Status(ErrorCodes::ExceededTimeLimit,
+ "Grace period expired while waiting for PeriodicTasks to terminate");
}
+ return Status::OK();
+}
- void PeriodicTaskRunner::_runTask(PeriodicTask* const task) {
- Timer timer;
-
- const std::string taskName = task->taskName();
+void PeriodicTaskRunner::run() {
+ // Use a shorter cycle time in debug mode to help catch race conditions.
+ const std::chrono::seconds waitTime(kDebugBuild ? 5 : 60);
- try {
- task->taskDoWork();
- }
- catch ( const std::exception& e ) {
- error() << "task: " << taskName << " failed: " << e.what() << endl;
- }
- catch ( ... ) {
- error() << "task: " << taskName << " failed with unknown error" << endl;
- }
-
- const int ms = timer.millis();
- const int kMinLogMs = 100;
- LOG( ms <= kMinLogMs ? 3 : 0 ) << "task: " << taskName << " took: " << ms << "ms" << endl;
+ std::unique_lock<std::mutex> lock(_mutex);
+ while (!_shutdownRequested) {
+ if (std::cv_status::timeout == _cond.wait_for(lock, waitTime))
+ _runTasks();
}
+}
+
+bool PeriodicTaskRunner::_isShutdownRequested() const {
+ return _shutdownRequested;
+}
+
+void PeriodicTaskRunner::_runTasks() {
+ const size_t size = _tasks.size();
+ for (size_t i = 0; i != size; ++i)
+ if (PeriodicTask* const task = _tasks[i])
+ _runTask(task);
+}
+
+void PeriodicTaskRunner::_runTask(PeriodicTask* const task) {
+ Timer timer;
+
+ const std::string taskName = task->taskName();
+
+ try {
+ task->taskDoWork();
+ } catch (const std::exception& e) {
+ error() << "task: " << taskName << " failed: " << e.what() << endl;
+ } catch (...) {
+ error() << "task: " << taskName << " failed with unknown error" << endl;
+ }
+
+ const int ms = timer.millis();
+ const int kMinLogMs = 100;
+ LOG(ms <= kMinLogMs ? 3 : 0) << "task: " << taskName << " took: " << ms << "ms" << endl;
+}
-} // namespace mongo
+} // namespace mongo