// @file background.cpp /* Copyright 2009 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" #include "mongo/util/background.h" #include #include #include #include "mongo/stdx/functional.h" #include "mongo/util/concurrency/mutex.h" #include "mongo/util/concurrency/spin_lock.h" #include "mongo/util/concurrency/thread_name.h" #include "mongo/util/debug_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" using namespace std; namespace mongo { namespace { class PeriodicTaskRunner : public BackgroundJob { public: PeriodicTaskRunner() : _mutex("PeriodicTaskRunner") , _shutdownRequested(false) {} void add( PeriodicTask* task ); void remove( PeriodicTask* task ); Status stop( int gracePeriodMillis ); private: virtual std::string name() const { return "PeriodicTaskRunner"; } virtual void run(); // Returns true if shutdown has been requested. You must hold _mutex to call this // function. bool _isShutdownRequested() const; // Runs all registered tasks. You must hold _mutex to call this function. void _runTasks(); // Runs one task to completion, and optionally reports timing. You must hold _mutex // to call this function. void _runTask( PeriodicTask* task ); // _mutex protects the _shutdownRequested flag and the _tasks vector. mongo::mutex _mutex; // The condition variable is used to sleep for the interval between task // executions, and is notified when the _shutdownRequested flag is toggled. boost::condition _cond; // Used to break the loop. You should notify _cond after changing this to true // so that shutdown proceeds promptly. bool _shutdownRequested; // The PeriodicTasks contained in this vector are NOT owned by the // PeriodicTaskRunner, and are not deleted. The vector never shrinks, removed Tasks // have their entry overwritten with NULL. std::vector< PeriodicTask* > _tasks; }; // We rely here on zero-initialization of 'runnerMutex' to distinguish whether we are // running before or after static initialization for this translation unit has // completed. In the former case, we assume no threads are present, so we do not need // to use the mutex. When present, the mutex protects 'runner' and 'runnerDestroyed' // below. SimpleMutex* const runnerMutex = new SimpleMutex("PeriodicTaskRunner"); // A scoped lock like object that only locks/unlocks the mutex if it exists. class ConditionalScopedLock { public: ConditionalScopedLock( SimpleMutex* mutex ) : _mutex( mutex ) { if ( _mutex ) _mutex->lock(); } ~ConditionalScopedLock() { if ( _mutex ) _mutex->unlock(); } private: SimpleMutex* const _mutex; }; // The unique PeriodicTaskRunner, also zero-initialized. PeriodicTaskRunner* runner; // The runner is never re-created once it has been destroyed. bool runnerDestroyed; } // namespace // both the BackgroundJob and the internal thread point to JobStatus struct BackgroundJob::JobStatus { JobStatus() : mutex( "backgroundJob" ) , state( NotStarted ) { } mongo::mutex mutex; boost::condition done; State state; }; BackgroundJob::BackgroundJob( bool selfDelete ) : _selfDelete( selfDelete ) , _status( new JobStatus ) { } BackgroundJob::~BackgroundJob() {} void BackgroundJob::jobBody() { const string threadName = name(); if (!threadName.empty()) { setThreadName(threadName.c_str()); } LOG(1) << "BackgroundJob starting: " << threadName << endl; try { run(); } catch (const std::exception& e) { error() << "backgroundjob " << threadName << " exception: " << e.what(); throw; } // We must cache this value so that we can use it after we leave the following scope. const bool selfDelete = _selfDelete; #ifdef MONGO_SSL // TODO(sverch): Allow people who use the BackgroundJob to also specify cleanup tasks. // Currently the networking code depends on this class and this class depends on the // networking code because of this ad hoc cleanup. SSLManagerInterface* manager = getSSLManager(); if (manager) manager->cleanupThreadLocals(); #endif { // It is illegal to access any state owned by this BackgroundJob after leaving this // scope, with the exception of the call to 'delete this' below. scoped_lock l( _status->mutex ); _status->state = Done; _status->done.notify_all(); } if( selfDelete ) delete this; } void BackgroundJob::go() { scoped_lock l( _status->mutex ); massert( 17234, mongoutils::str::stream() << "backgroundJob already running: " << name(), _status->state != Running ); // If the job is already 'done', for instance because it was cancelled or already // finished, ignore additional requests to run the job. if (_status->state == NotStarted) { boost::thread t( stdx::bind( &BackgroundJob::jobBody , this ) ); _status->state = Running; } } Status BackgroundJob::cancel() { scoped_lock l( _status->mutex ); if ( _status->state == Running ) return Status( ErrorCodes::IllegalOperation, "Cannot cancel a running BackgroundJob" ); if ( _status->state == NotStarted ) { _status->state = Done; _status->done.notify_all(); } return Status::OK(); } bool BackgroundJob::wait( unsigned msTimeOut ) { verify( !_selfDelete ); // you cannot call wait on a self-deleting job scoped_lock l( _status->mutex ); while ( _status->state != Done ) { if ( msTimeOut ) { boost::xtime deadline = incxtimemillis( msTimeOut ); if ( !_status->done.timed_wait( l.boost() , deadline ) ) return false; } else { _status->done.wait( l.boost() ); } } return true; } BackgroundJob::State BackgroundJob::getState() const { scoped_lock l( _status->mutex ); return _status->state; } bool BackgroundJob::running() const { scoped_lock l( _status->mutex ); return _status->state == Running; } // ------------------------- PeriodicTask::PeriodicTask() { ConditionalScopedLock lock( runnerMutex ); if ( runnerDestroyed ) return; if ( !runner ) runner = new PeriodicTaskRunner; runner->add( this ); } PeriodicTask::~PeriodicTask() { ConditionalScopedLock lock( runnerMutex ); if ( runnerDestroyed || !runner ) return; runner->remove( this ); } void PeriodicTask::startRunningPeriodicTasks() { ConditionalScopedLock lock( runnerMutex ); if ( runnerDestroyed ) return; if ( !runner ) runner = new PeriodicTaskRunner; runner->go(); } Status PeriodicTask::stopRunningPeriodicTasks( int gracePeriodMillis ) { ConditionalScopedLock lock( runnerMutex ); Status status = Status::OK(); if ( runnerDestroyed || !runner ) return status; runner->cancel(); status = runner->stop( gracePeriodMillis ); if ( status.isOK() ) { delete runner; runnerDestroyed = true; } return status; } void PeriodicTaskRunner::add( PeriodicTask* task ) { mutex::scoped_lock lock( _mutex ); _tasks.push_back( task ); } void PeriodicTaskRunner::remove( PeriodicTask* task ) { mutex::scoped_lock lock( _mutex ); for ( size_t i = 0; i != _tasks.size(); i++ ) { if ( _tasks[i] == task ) { _tasks[i] = NULL; break; } } } Status PeriodicTaskRunner::stop( int gracePeriodMillis ) { { mutex::scoped_lock lock( _mutex ); _shutdownRequested = true; _cond.notify_one(); } if ( !wait( gracePeriodMillis ) ) { return Status( ErrorCodes::ExceededTimeLimit, "Grace period expired while waiting for PeriodicTasks to terminate" ); } return Status::OK(); } void PeriodicTaskRunner::run() { // Use a shorter cycle time in debug mode to help catch race conditions. const size_t waitMillis = (debug ? 5 : 60) * 1000; const stdx::function predicate = stdx::bind( &PeriodicTaskRunner::_isShutdownRequested, this ); mutex::scoped_lock lock( _mutex ); while ( !predicate() ) { const boost::xtime deadline = incxtimemillis( waitMillis ); if ( !_cond.timed_wait( lock.boost(), deadline, predicate ) ) _runTasks(); } } bool PeriodicTaskRunner::_isShutdownRequested() const { return _shutdownRequested; } void PeriodicTaskRunner::_runTasks() { const size_t size = _tasks.size(); for ( size_t i = 0; i != size; ++i ) if ( PeriodicTask* const task = _tasks[i] ) _runTask(task); } void PeriodicTaskRunner::_runTask(PeriodicTask* const task) { Timer timer; const std::string taskName = task->taskName(); try { task->taskDoWork(); } catch ( const std::exception& e ) { error() << "task: " << taskName << " failed: " << e.what() << endl; } catch ( ... ) { error() << "task: " << taskName << " failed with unknown error" << endl; } const int ms = timer.millis(); LOG( ms <= 3 ? 3 : 0 ) << "task: " << taskName << " took: " << ms << "ms" << endl; } } // namespace mongo