diff options
Diffstat (limited to 'src/mongo/util')
-rw-r--r-- | src/mongo/util/background.cpp | 340 | ||||
-rw-r--r-- | src/mongo/util/background.h | 91 | ||||
-rw-r--r-- | src/mongo/util/background_job_test.cpp | 90 | ||||
-rw-r--r-- | src/mongo/util/concurrency/msg.h | 1 |
4 files changed, 386 insertions, 136 deletions
diff --git a/src/mongo/util/background.cpp b/src/mongo/util/background.cpp index b3a15667ffe..8c33242d74a 100644 --- a/src/mongo/util/background.cpp +++ b/src/mongo/util/background.cpp @@ -19,7 +19,10 @@ #include "mongo/util/background.h" +#include <boost/bind.hpp> +#include <boost/function.hpp> #include <boost/thread/condition.hpp> +#include <boost/thread/once.hpp> #include <boost/thread/thread.hpp> #include "mongo/util/concurrency/mutex.h" @@ -33,51 +36,132 @@ using namespace std; namespace mongo { + namespace { + + class PeriodicTaskRunner : public BackgroundJob { + public: + + PeriodicTaskRunner() + : _mutex("PeriodicTaskRunner") + , _shutdownRequested(false) {} + + void add( PeriodicTask* task ); + void remove( PeriodicTask* task ); + + Status stop( int gracePeriodMillis ); + + private: + + virtual std::string name() const { + return "PeriodicTaskRunner"; + } + + virtual void run(); + + // Returns true if shutdown has been requested. You must hold _mutex to call this + // function. + bool _isShutdownRequested() const; + + // Runs all registered tasks. You must hold _mutex to call this function. + void _runTasks(); + + // Runs one task to completion, and optionally reports timing. You must hold _mutex + // to call this function. + void _runTask( PeriodicTask* task ); + + // _mutex protects the _shutdownRequested flag and the _tasks vector. + mongo::mutex _mutex; + + // The condition variable is used to sleep for the interval between task + // executions, and is notified when the _shutdownRequested flag is toggled. + boost::condition _cond; + + // Used to break the loop. You should notify _cond after changing this to true + // so that shutdown proceeds promptly. + bool _shutdownRequested; + + // The PeriodicTasks contained in this vector are NOT owned by the + // PeriodicTaskRunner, and are not deleted. The vector never shrinks, removed Tasks + // have their entry overwritten with NULL. + std::vector< PeriodicTask* > _tasks; + }; + + // We rely here on zero-initialization of 'runnerMutex' to distinguish whether we are + // running before or after static initialization for this translation unit has + // completed. In the former case, we assume no threads are present, so we do not need + // to use the mutex. When present, the mutex protects 'runner' and 'runnerDestroyed' + // below. + SimpleMutex* const runnerMutex = new SimpleMutex("PeriodicTaskRunner"); + + // A scoped lock like object that only locks/unlocks the mutex if it exists. + class ConditionalScopedLock { + public: + ConditionalScopedLock( SimpleMutex* mutex ) : _mutex( mutex ) { + if ( _mutex ) + _mutex->lock(); + } + ~ConditionalScopedLock() { + if ( _mutex ) + _mutex->unlock(); + } + private: + SimpleMutex* const _mutex; + }; + + // The unique PeriodicTaskRunner, also zero-initialized. + PeriodicTaskRunner* runner; + + // The runner is never re-created once it has been destroyed. + bool runnerDestroyed; + + } // namespace + // both the BackgroundJob and the internal thread point to JobStatus struct BackgroundJob::JobStatus { - JobStatus( bool delFlag ) - : deleteSelf(delFlag), m("backgroundJob"), state(NotStarted) { } - - const bool deleteSelf; + JobStatus() + : mutex( "backgroundJob" ) + , state( NotStarted ) { + } - mongo::mutex m; // protects state below - boost::condition finished; // means _state == Done + mongo::mutex mutex; + boost::condition done; State state; }; - BackgroundJob::BackgroundJob( bool selfDelete ) { - _status.reset( new JobStatus( selfDelete ) ); + BackgroundJob::BackgroundJob( bool selfDelete ) + : _selfDelete( selfDelete ) + , _status( new JobStatus ) { } - // Background object can be only be destroyed after jobBody() ran - void BackgroundJob::jobBody( boost::shared_ptr<JobStatus> status ) { - LOG(1) << "BackgroundJob starting: " << name() << endl; - { - scoped_lock l( status->m ); - massert( 13643, mongoutils::str::stream() << "backgroundjob already started: " - << name(), - status->state != Running ); - status->state = Running; - } + BackgroundJob::~BackgroundJob() {} + + void BackgroundJob::jobBody() { const string threadName = name(); if( ! threadName.empty() ) setThreadName( threadName.c_str() ); + LOG(1) << "BackgroundJob starting: " << threadName << endl; + try { run(); } catch ( std::exception& e ) { - error() << "backgroundjob " << name() << " exception: " << e.what() << endl; + error() << "backgroundjob " << threadName << " exception: " << e.what() << endl; } catch(...) { - error() << "uncaught exception in BackgroundJob " << name() << endl; + error() << "uncaught exception in BackgroundJob " << threadName << endl; } + // We must cache this value so that we can use it after we leave the following scope. + const bool selfDelete = _selfDelete; + { - scoped_lock l( status->m ); - status->state = Done; - status->finished.notify_all(); + // It is illegal to access any state owned by this BackgroundJob after leaving this + // scope, with the exception of the call to 'delete this' below. + scoped_lock l( _status->mutex ); + _status->state = Done; + _status->done.notify_all(); } #ifdef MONGO_SSL @@ -85,118 +169,188 @@ namespace mongo { if (manager) manager->cleanupThreadLocals(); #endif - if( status->deleteSelf ) + + if( selfDelete ) delete this; } - BackgroundJob& BackgroundJob::go() { - boost::thread t( boost::bind( &BackgroundJob::jobBody , this, _status ) ); - return *this; + void BackgroundJob::go() { + scoped_lock l( _status->mutex ); + massert( 17234, mongoutils::str::stream() + << "backgroundJob already running: " << name(), + _status->state != Running ); + + // If the job is already 'done', for instance because it was cancelled or already + // finished, ignore additional requests to run the job. + if (_status->state == NotStarted) { + boost::thread t( boost::bind( &BackgroundJob::jobBody , this ) ); + _status->state = Running; + } + } + + Status BackgroundJob::cancel() { + scoped_lock l( _status->mutex ); + + if ( _status->state == Running ) + return Status( ErrorCodes::IllegalOperation, + "Cannot cancel a running BackgroundJob" ); + + if ( _status->state == NotStarted ) { + _status->state = Done; + _status->done.notify_all(); + } + + return Status::OK(); } bool BackgroundJob::wait( unsigned msTimeOut ) { - verify( !_status->deleteSelf ); // you cannot call wait on a self-deleting job - scoped_lock l( _status->m ); + verify( !_selfDelete ); // you cannot call wait on a self-deleting job + scoped_lock l( _status->mutex ); while ( _status->state != Done ) { if ( msTimeOut ) { - // add msTimeOut millisecond to current time - boost::xtime xt; - boost::xtime_get( &xt, MONGO_BOOST_TIME_UTC ); - - unsigned long long ns = msTimeOut * 1000000ULL; // milli to nano - if ( xt.nsec + ns < 1000000000 ) { - xt.nsec = (boost::xtime::xtime_nsec_t) (xt.nsec + ns); - } - else { - xt.sec += 1 + ns / 1000000000; - xt.nsec = ( ns + xt.nsec ) % 1000000000; - } - - if ( ! _status->finished.timed_wait( l.boost() , xt ) ) + boost::xtime deadline = incxtimemillis( msTimeOut ); + if ( !_status->done.timed_wait( l.boost() , deadline ) ) return false; - } else { - _status->finished.wait( l.boost() ); + _status->done.wait( l.boost() ); } } return true; } BackgroundJob::State BackgroundJob::getState() const { - scoped_lock l( _status->m); + scoped_lock l( _status->mutex ); return _status->state; } bool BackgroundJob::running() const { - scoped_lock l( _status->m); + scoped_lock l( _status->mutex ); return _status->state == Running; } // ------------------------- PeriodicTask::PeriodicTask() { - if ( ! theRunner ) - theRunner = new Runner(); - theRunner->add( this ); + ConditionalScopedLock lock( runnerMutex ); + if ( runnerDestroyed ) + return; + + if ( !runner ) + runner = new PeriodicTaskRunner; + + runner->add( this ); } PeriodicTask::~PeriodicTask() { - theRunner->remove( this ); + ConditionalScopedLock lock( runnerMutex ); + if ( runnerDestroyed || !runner ) + return; + + runner->remove( this ); } - void PeriodicTask::Runner::add( PeriodicTask* task ) { - scoped_spinlock lk( _lock ); + void PeriodicTask::startRunningPeriodicTasks() { + ConditionalScopedLock lock( runnerMutex ); + if ( runnerDestroyed ) + return; + + if ( !runner ) + runner = new PeriodicTaskRunner; + + runner->go(); + } + + Status PeriodicTask::stopRunningPeriodicTasks( int gracePeriodMillis ) { + ConditionalScopedLock lock( runnerMutex ); + + Status status = Status::OK(); + if ( runnerDestroyed || !runner ) + return status; + + runner->cancel(); + status = runner->stop( gracePeriodMillis ); + + if ( status.isOK() ) { + delete runner; + runnerDestroyed = true; + } + + return status; + } + + void PeriodicTaskRunner::add( PeriodicTask* task ) { + mutex::scoped_lock lock( _mutex ); _tasks.push_back( task ); } - - void PeriodicTask::Runner::remove( PeriodicTask* task ) { - scoped_spinlock lk( _lock ); - for ( size_t i=0; i<_tasks.size(); i++ ) { + + void PeriodicTaskRunner::remove( PeriodicTask* task ) { + mutex::scoped_lock lock( _mutex ); + for ( size_t i = 0; i != _tasks.size(); i++ ) { if ( _tasks[i] == task ) { - _tasks[i] = 0; + _tasks[i] = NULL; break; } } } - void PeriodicTask::Runner::run() { - int sleeptime = 60; - DEV sleeptime = 5; // to catch race conditions - - while ( ! inShutdown() ) { - - sleepsecs( sleeptime ); - - scoped_spinlock lk( _lock ); - - size_t size = _tasks.size(); - - for ( size_t i=0; i<size; i++ ) { - PeriodicTask * t = _tasks[i]; - if ( ! t ) - continue; - - if ( inShutdown() ) - break; - - Timer timer; - try { - t->taskDoWork(); - } - catch ( std::exception& e ) { - error() << "task: " << t->taskName() << " failed: " << e.what() << endl; - } - catch ( ... ) { - error() << "task: " << t->taskName() << " failed with unknown error" << endl; - } - - int ms = timer.millis(); - LOG( ms <= 3 ? 3 : 0 ) << "task: " << t->taskName() << " took: " << ms << "ms" << endl; - } + Status PeriodicTaskRunner::stop( int gracePeriodMillis ) { + { + mutex::scoped_lock lock( _mutex ); + _shutdownRequested = true; + _cond.notify_one(); + } + + if ( !wait( gracePeriodMillis ) ) { + return Status( ErrorCodes::ExceededTimeLimit, + "Grace period expired while waiting for PeriodicTasks to terminate" ); } + return Status::OK(); } - PeriodicTask::Runner* PeriodicTask::theRunner = 0; + void PeriodicTaskRunner::run() { + // Use a shorter cycle time in debug mode to help catch race conditions. + const size_t waitMillis = (debug ? 5 : 60) * 1000; + + const boost::function<bool()> predicate = + boost::bind( &PeriodicTaskRunner::_isShutdownRequested, this ); + + mutex::scoped_lock lock( _mutex ); + while ( !predicate() ) { + const boost::xtime deadline = incxtimemillis( waitMillis ); + if ( !_cond.timed_wait( lock.boost(), deadline, predicate ) ) + _runTasks(); + } + } + + bool PeriodicTaskRunner::_isShutdownRequested() const { + return _shutdownRequested; + } + + void PeriodicTaskRunner::_runTasks() { + const size_t size = _tasks.size(); + for ( size_t i = 0; i != size; ++i ) + if ( PeriodicTask* const task = _tasks[i] ) + _runTask(task); + } + + void PeriodicTaskRunner::_runTask(PeriodicTask* const task) { + Timer timer; + + const std::string taskName = task->taskName(); + + try { + task->taskDoWork(); + } + catch ( const std::exception& e ) { + error() << "task: " << taskName << " failed: " << e.what() << endl; + } + catch ( ... ) { + error() << "task: " << taskName << " failed with unknown error" << endl; + } + + const int ms = timer.millis(); + LOG( ms <= 3 ? 3 : 0 ) << "task: " << taskName << " took: " << ms << "ms" << endl; + } } // namespace mongo diff --git a/src/mongo/util/background.h b/src/mongo/util/background.h index bc4cc28df6c..96c4169cb94 100644 --- a/src/mongo/util/background.h +++ b/src/mongo/util/background.h @@ -17,10 +17,11 @@ #pragma once +#include <boost/scoped_ptr.hpp> #include <string> #include <vector> -#include "concurrency/spin_lock.h" +#include "mongo/base/status.h" namespace mongo { @@ -28,19 +29,16 @@ namespace mongo { * Background thread dispatching. * subclass and define run() * - * It is ok to call go(), that is, run the job, more than once -- if the - * previous invocation has finished. Thus one pattern of use is to embed - * a backgroundjob in your object and reuse it (or same thing with - * inheritance). Each go() call spawns a new thread. + * It is not possible to run the job more than once. An attempt to call 'go' while the + * task is running will fail. Calling 'go' after the task has finished are ignored and + * will not start the job again. * - * Thread safety: - * note when job destructs, the thread is not terminated if still running. - * generally if the thread could still be running, allocate the job dynamically - * and set deleteSelf to true. + * Thread safety: Note that when the job destructs, the thread is not terminated if still + * running. Generally, if the thread could still be running, allocate the job dynamically + * and set deleteSelf to true. * - * go() and wait() are not thread safe - * run() will be executed on the background thread - * BackgroundJob object must exist for as long the background thread is running + * The overridden run() method will be executed on the background thread, so the + * BackgroundJob object must exist for as long the background thread is running. */ class BackgroundJob : boost::noncopyable { @@ -74,7 +72,7 @@ namespace mongo { Done }; - virtual ~BackgroundJob() { } + virtual ~BackgroundJob(); /** * starts job. @@ -83,7 +81,17 @@ namespace mongo { * @note the BackgroundJob object must live for as long the thread is still running, ie * until getState() returns Done. */ - BackgroundJob& go(); + void go(); + + + /** + * If the job has not yet started, transitions the job to the 'done' state immediately, + * such that subsequent calls to 'go' are ignored, and notifies any waiters waiting in + * 'wait'. If the job has already been started, this method returns a not-ok status: it + * does not cancel running jobs. For this reason, you must still call 'wait' on a + * BackgroundJob even after calling 'cancel'. + */ + Status cancel(); /** * wait for completion. @@ -96,18 +104,21 @@ namespace mongo { */ bool wait( unsigned msTimeOut = 0 ); - // accessors + // accessors. Note that while the access to the internal state is synchronized within + // these methods, there is no guarantee that the BackgroundJob is still in the + // indicated state after returning. State getState() const; bool running() const; private: - struct JobStatus; - boost::shared_ptr<JobStatus> _status; // shared between 'this' and body() thread + const bool _selfDelete; - void jobBody( boost::shared_ptr<JobStatus> status ); + struct JobStatus; + const boost::scoped_ptr<JobStatus> _status; + void jobBody(); }; - + /** * these run "roughly" every minute * instantiate statically @@ -125,31 +136,25 @@ namespace mongo { virtual void taskDoWork() = 0; virtual std::string taskName() const = 0; - class Runner : public BackgroundJob { - public: - virtual ~Runner(){} - - virtual std::string name() const { return "PeriodicTask::Runner"; } - - virtual void run(); - - void add( PeriodicTask* task ); - void remove( PeriodicTask* task ); - - private: - - SpinLock _lock; - - // these are NOT owned by Runner - // Runner will not delete these - // this never gets smaller - // only fields replaced with nulls - std::vector< PeriodicTask* > _tasks; - - }; - - static Runner* theRunner; + /** + * Starts the BackgroundJob that runs PeriodicTasks. You may call this multiple times, + * from multiple threads, and the BackgroundJob will be started only once. Please note + * that since this method starts threads, it is not appropriate to call it from within + * a mongo initializer. Calling this method after calling 'stopRunningPeriodicTasks' + * does not re-start the background job. + */ + static void startRunningPeriodicTasks(); + /** + * Waits 'gracePeriodMillis' for the BackgroundJob responsible for PeriodicTask + * execution to finish any running tasks, then destroys it. If the BackgroundJob was + * never started, returns Status::OK right away. If the BackgroundJob does not + * terminate within the grace period, returns an invalid status. It is safe to call + * this method repeatedly from one thread if the grace period is overshot. It is not + * safe to call this method from multiple threads, or in a way that races with + * 'startRunningPeriodicTasks'. + */ + static Status stopRunningPeriodicTasks( int gracePeriodMillis ); }; diff --git a/src/mongo/util/background_job_test.cpp b/src/mongo/util/background_job_test.cpp new file mode 100644 index 00000000000..f7698ae0d4a --- /dev/null +++ b/src/mongo/util/background_job_test.cpp @@ -0,0 +1,90 @@ +/* Copyright 2013 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/server_options.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/background.h" +#include "mongo/util/concurrency/mutex.h" +#include "mongo/util/concurrency/synchronization.h" + +namespace mongo { + + ServerGlobalParams serverGlobalParams; + + bool inShutdown() { + return false; + } + +} // namespace mongo + +namespace { + + using mongo::BackgroundJob; + using mongo::MsgAssertionException; + using mongo::mutex; + using mongo::Notification; + + TEST(BackgroundJobLifeCycle, Go) { + + class Job : public BackgroundJob { + public: + Job() + : _mutex("BackgroundJobLifeCycle::Go") + , _hasRun(false) {} + + virtual std::string name() const { + return "BackgroundLifeCycle::CannotCallGoAgain"; + } + + virtual void run() { + { + mongo::scoped_lock lock( _mutex ); + ASSERT_FALSE( _hasRun ); + _hasRun = true; + } + + _n.waitToBeNotified(); + } + + void notify() { + _n.notifyOne(); + } + + private: + mutex _mutex; + bool _hasRun; + Notification _n; + }; + + Job j; + + // This call starts Job running. + j.go(); + + // Calling 'go' again while it is running is an error. + ASSERT_THROWS(j.go(), MsgAssertionException); + + // Stop the Job + j.notify(); + j.wait(); + + // Calling 'go' on a done task is a no-op. If it were not, + // we would fail the assert in Job::run above. + j.go(); + } + +} // namespace diff --git a/src/mongo/util/concurrency/msg.h b/src/mongo/util/concurrency/msg.h index 7652d107895..303dc051a52 100644 --- a/src/mongo/util/concurrency/msg.h +++ b/src/mongo/util/concurrency/msg.h @@ -35,6 +35,7 @@ #include <boost/thread/condition.hpp> #include <boost/function.hpp> +#include "mongo/util/concurrency/mutex.h" #include "task.h" namespace mongo { |