summaryrefslogtreecommitdiff
path: root/src/mongo/util
diff options
context:
space:
mode:
authorAndrew Morrow <acm@10gen.com>2013-10-31 15:34:38 -0400
committerAndrew Morrow <acm@10gen.com>2013-11-02 10:19:06 -0400
commitcc2e41be23c7052f28aa985b99e0c03a84017593 (patch)
tree1a980b69a503b85c04e2d052f6dc25e1ae9f2637 /src/mongo/util
parent50030f55d08d5ffe6a10e76542487b1b989e4972 (diff)
downloadmongo-cc2e41be23c7052f28aa985b99e0c03a84017593.tar.gz
SERVER-7217 Provide new client API to manage initialization and termination
Diffstat (limited to 'src/mongo/util')
-rw-r--r--src/mongo/util/background.cpp340
-rw-r--r--src/mongo/util/background.h91
-rw-r--r--src/mongo/util/background_job_test.cpp90
-rw-r--r--src/mongo/util/concurrency/msg.h1
4 files changed, 386 insertions, 136 deletions
diff --git a/src/mongo/util/background.cpp b/src/mongo/util/background.cpp
index b3a15667ffe..8c33242d74a 100644
--- a/src/mongo/util/background.cpp
+++ b/src/mongo/util/background.cpp
@@ -19,7 +19,10 @@
#include "mongo/util/background.h"
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
#include <boost/thread/condition.hpp>
+#include <boost/thread/once.hpp>
#include <boost/thread/thread.hpp>
#include "mongo/util/concurrency/mutex.h"
@@ -33,51 +36,132 @@
using namespace std;
namespace mongo {
+ namespace {
+
+ class PeriodicTaskRunner : public BackgroundJob {
+ public:
+
+ PeriodicTaskRunner()
+ : _mutex("PeriodicTaskRunner")
+ , _shutdownRequested(false) {}
+
+ void add( PeriodicTask* task );
+ void remove( PeriodicTask* task );
+
+ Status stop( int gracePeriodMillis );
+
+ private:
+
+ virtual std::string name() const {
+ return "PeriodicTaskRunner";
+ }
+
+ virtual void run();
+
+ // Returns true if shutdown has been requested. You must hold _mutex to call this
+ // function.
+ bool _isShutdownRequested() const;
+
+ // Runs all registered tasks. You must hold _mutex to call this function.
+ void _runTasks();
+
+ // Runs one task to completion, and optionally reports timing. You must hold _mutex
+ // to call this function.
+ void _runTask( PeriodicTask* task );
+
+ // _mutex protects the _shutdownRequested flag and the _tasks vector.
+ mongo::mutex _mutex;
+
+ // The condition variable is used to sleep for the interval between task
+ // executions, and is notified when the _shutdownRequested flag is toggled.
+ boost::condition _cond;
+
+ // Used to break the loop. You should notify _cond after changing this to true
+ // so that shutdown proceeds promptly.
+ bool _shutdownRequested;
+
+ // The PeriodicTasks contained in this vector are NOT owned by the
+ // PeriodicTaskRunner, and are not deleted. The vector never shrinks, removed Tasks
+ // have their entry overwritten with NULL.
+ std::vector< PeriodicTask* > _tasks;
+ };
+
+ // We rely here on zero-initialization of 'runnerMutex' to distinguish whether we are
+ // running before or after static initialization for this translation unit has
+ // completed. In the former case, we assume no threads are present, so we do not need
+ // to use the mutex. When present, the mutex protects 'runner' and 'runnerDestroyed'
+ // below.
+ SimpleMutex* const runnerMutex = new SimpleMutex("PeriodicTaskRunner");
+
+ // A scoped lock like object that only locks/unlocks the mutex if it exists.
+ class ConditionalScopedLock {
+ public:
+ ConditionalScopedLock( SimpleMutex* mutex ) : _mutex( mutex ) {
+ if ( _mutex )
+ _mutex->lock();
+ }
+ ~ConditionalScopedLock() {
+ if ( _mutex )
+ _mutex->unlock();
+ }
+ private:
+ SimpleMutex* const _mutex;
+ };
+
+ // The unique PeriodicTaskRunner, also zero-initialized.
+ PeriodicTaskRunner* runner;
+
+ // The runner is never re-created once it has been destroyed.
+ bool runnerDestroyed;
+
+ } // namespace
+
// both the BackgroundJob and the internal thread point to JobStatus
struct BackgroundJob::JobStatus {
- JobStatus( bool delFlag )
- : deleteSelf(delFlag), m("backgroundJob"), state(NotStarted) { }
-
- const bool deleteSelf;
+ JobStatus()
+ : mutex( "backgroundJob" )
+ , state( NotStarted ) {
+ }
- mongo::mutex m; // protects state below
- boost::condition finished; // means _state == Done
+ mongo::mutex mutex;
+ boost::condition done;
State state;
};
- BackgroundJob::BackgroundJob( bool selfDelete ) {
- _status.reset( new JobStatus( selfDelete ) );
+ BackgroundJob::BackgroundJob( bool selfDelete )
+ : _selfDelete( selfDelete )
+ , _status( new JobStatus ) {
}
- // Background object can be only be destroyed after jobBody() ran
- void BackgroundJob::jobBody( boost::shared_ptr<JobStatus> status ) {
- LOG(1) << "BackgroundJob starting: " << name() << endl;
- {
- scoped_lock l( status->m );
- massert( 13643, mongoutils::str::stream() << "backgroundjob already started: "
- << name(),
- status->state != Running );
- status->state = Running;
- }
+ BackgroundJob::~BackgroundJob() {}
+
+ void BackgroundJob::jobBody() {
const string threadName = name();
if( ! threadName.empty() )
setThreadName( threadName.c_str() );
+ LOG(1) << "BackgroundJob starting: " << threadName << endl;
+
try {
run();
}
catch ( std::exception& e ) {
- error() << "backgroundjob " << name() << " exception: " << e.what() << endl;
+ error() << "backgroundjob " << threadName << " exception: " << e.what() << endl;
}
catch(...) {
- error() << "uncaught exception in BackgroundJob " << name() << endl;
+ error() << "uncaught exception in BackgroundJob " << threadName << endl;
}
+ // We must cache this value so that we can use it after we leave the following scope.
+ const bool selfDelete = _selfDelete;
+
{
- scoped_lock l( status->m );
- status->state = Done;
- status->finished.notify_all();
+ // It is illegal to access any state owned by this BackgroundJob after leaving this
+ // scope, with the exception of the call to 'delete this' below.
+ scoped_lock l( _status->mutex );
+ _status->state = Done;
+ _status->done.notify_all();
}
#ifdef MONGO_SSL
@@ -85,118 +169,188 @@ namespace mongo {
if (manager)
manager->cleanupThreadLocals();
#endif
- if( status->deleteSelf )
+
+ if( selfDelete )
delete this;
}
- BackgroundJob& BackgroundJob::go() {
- boost::thread t( boost::bind( &BackgroundJob::jobBody , this, _status ) );
- return *this;
+ void BackgroundJob::go() {
+ scoped_lock l( _status->mutex );
+ massert( 17234, mongoutils::str::stream()
+ << "backgroundJob already running: " << name(),
+ _status->state != Running );
+
+ // If the job is already 'done', for instance because it was cancelled or already
+ // finished, ignore additional requests to run the job.
+ if (_status->state == NotStarted) {
+ boost::thread t( boost::bind( &BackgroundJob::jobBody , this ) );
+ _status->state = Running;
+ }
+ }
+
+ Status BackgroundJob::cancel() {
+ scoped_lock l( _status->mutex );
+
+ if ( _status->state == Running )
+ return Status( ErrorCodes::IllegalOperation,
+ "Cannot cancel a running BackgroundJob" );
+
+ if ( _status->state == NotStarted ) {
+ _status->state = Done;
+ _status->done.notify_all();
+ }
+
+ return Status::OK();
}
bool BackgroundJob::wait( unsigned msTimeOut ) {
- verify( !_status->deleteSelf ); // you cannot call wait on a self-deleting job
- scoped_lock l( _status->m );
+ verify( !_selfDelete ); // you cannot call wait on a self-deleting job
+ scoped_lock l( _status->mutex );
while ( _status->state != Done ) {
if ( msTimeOut ) {
- // add msTimeOut millisecond to current time
- boost::xtime xt;
- boost::xtime_get( &xt, MONGO_BOOST_TIME_UTC );
-
- unsigned long long ns = msTimeOut * 1000000ULL; // milli to nano
- if ( xt.nsec + ns < 1000000000 ) {
- xt.nsec = (boost::xtime::xtime_nsec_t) (xt.nsec + ns);
- }
- else {
- xt.sec += 1 + ns / 1000000000;
- xt.nsec = ( ns + xt.nsec ) % 1000000000;
- }
-
- if ( ! _status->finished.timed_wait( l.boost() , xt ) )
+ boost::xtime deadline = incxtimemillis( msTimeOut );
+ if ( !_status->done.timed_wait( l.boost() , deadline ) )
return false;
-
}
else {
- _status->finished.wait( l.boost() );
+ _status->done.wait( l.boost() );
}
}
return true;
}
BackgroundJob::State BackgroundJob::getState() const {
- scoped_lock l( _status->m);
+ scoped_lock l( _status->mutex );
return _status->state;
}
bool BackgroundJob::running() const {
- scoped_lock l( _status->m);
+ scoped_lock l( _status->mutex );
return _status->state == Running;
}
// -------------------------
PeriodicTask::PeriodicTask() {
- if ( ! theRunner )
- theRunner = new Runner();
- theRunner->add( this );
+ ConditionalScopedLock lock( runnerMutex );
+ if ( runnerDestroyed )
+ return;
+
+ if ( !runner )
+ runner = new PeriodicTaskRunner;
+
+ runner->add( this );
}
PeriodicTask::~PeriodicTask() {
- theRunner->remove( this );
+ ConditionalScopedLock lock( runnerMutex );
+ if ( runnerDestroyed || !runner )
+ return;
+
+ runner->remove( this );
}
- void PeriodicTask::Runner::add( PeriodicTask* task ) {
- scoped_spinlock lk( _lock );
+ void PeriodicTask::startRunningPeriodicTasks() {
+ ConditionalScopedLock lock( runnerMutex );
+ if ( runnerDestroyed )
+ return;
+
+ if ( !runner )
+ runner = new PeriodicTaskRunner;
+
+ runner->go();
+ }
+
+ Status PeriodicTask::stopRunningPeriodicTasks( int gracePeriodMillis ) {
+ ConditionalScopedLock lock( runnerMutex );
+
+ Status status = Status::OK();
+ if ( runnerDestroyed || !runner )
+ return status;
+
+ runner->cancel();
+ status = runner->stop( gracePeriodMillis );
+
+ if ( status.isOK() ) {
+ delete runner;
+ runnerDestroyed = true;
+ }
+
+ return status;
+ }
+
+ void PeriodicTaskRunner::add( PeriodicTask* task ) {
+ mutex::scoped_lock lock( _mutex );
_tasks.push_back( task );
}
-
- void PeriodicTask::Runner::remove( PeriodicTask* task ) {
- scoped_spinlock lk( _lock );
- for ( size_t i=0; i<_tasks.size(); i++ ) {
+
+ void PeriodicTaskRunner::remove( PeriodicTask* task ) {
+ mutex::scoped_lock lock( _mutex );
+ for ( size_t i = 0; i != _tasks.size(); i++ ) {
if ( _tasks[i] == task ) {
- _tasks[i] = 0;
+ _tasks[i] = NULL;
break;
}
}
}
- void PeriodicTask::Runner::run() {
- int sleeptime = 60;
- DEV sleeptime = 5; // to catch race conditions
-
- while ( ! inShutdown() ) {
-
- sleepsecs( sleeptime );
-
- scoped_spinlock lk( _lock );
-
- size_t size = _tasks.size();
-
- for ( size_t i=0; i<size; i++ ) {
- PeriodicTask * t = _tasks[i];
- if ( ! t )
- continue;
-
- if ( inShutdown() )
- break;
-
- Timer timer;
- try {
- t->taskDoWork();
- }
- catch ( std::exception& e ) {
- error() << "task: " << t->taskName() << " failed: " << e.what() << endl;
- }
- catch ( ... ) {
- error() << "task: " << t->taskName() << " failed with unknown error" << endl;
- }
-
- int ms = timer.millis();
- LOG( ms <= 3 ? 3 : 0 ) << "task: " << t->taskName() << " took: " << ms << "ms" << endl;
- }
+ Status PeriodicTaskRunner::stop( int gracePeriodMillis ) {
+ {
+ mutex::scoped_lock lock( _mutex );
+ _shutdownRequested = true;
+ _cond.notify_one();
+ }
+
+ if ( !wait( gracePeriodMillis ) ) {
+ return Status( ErrorCodes::ExceededTimeLimit,
+ "Grace period expired while waiting for PeriodicTasks to terminate" );
}
+ return Status::OK();
}
- PeriodicTask::Runner* PeriodicTask::theRunner = 0;
+ void PeriodicTaskRunner::run() {
+ // Use a shorter cycle time in debug mode to help catch race conditions.
+ const size_t waitMillis = (debug ? 5 : 60) * 1000;
+
+ const boost::function<bool()> predicate =
+ boost::bind( &PeriodicTaskRunner::_isShutdownRequested, this );
+
+ mutex::scoped_lock lock( _mutex );
+ while ( !predicate() ) {
+ const boost::xtime deadline = incxtimemillis( waitMillis );
+ if ( !_cond.timed_wait( lock.boost(), deadline, predicate ) )
+ _runTasks();
+ }
+ }
+
+ bool PeriodicTaskRunner::_isShutdownRequested() const {
+ return _shutdownRequested;
+ }
+
+ void PeriodicTaskRunner::_runTasks() {
+ const size_t size = _tasks.size();
+ for ( size_t i = 0; i != size; ++i )
+ if ( PeriodicTask* const task = _tasks[i] )
+ _runTask(task);
+ }
+
+ void PeriodicTaskRunner::_runTask(PeriodicTask* const task) {
+ Timer timer;
+
+ const std::string taskName = task->taskName();
+
+ try {
+ task->taskDoWork();
+ }
+ catch ( const std::exception& e ) {
+ error() << "task: " << taskName << " failed: " << e.what() << endl;
+ }
+ catch ( ... ) {
+ error() << "task: " << taskName << " failed with unknown error" << endl;
+ }
+
+ const int ms = timer.millis();
+ LOG( ms <= 3 ? 3 : 0 ) << "task: " << taskName << " took: " << ms << "ms" << endl;
+ }
} // namespace mongo
diff --git a/src/mongo/util/background.h b/src/mongo/util/background.h
index bc4cc28df6c..96c4169cb94 100644
--- a/src/mongo/util/background.h
+++ b/src/mongo/util/background.h
@@ -17,10 +17,11 @@
#pragma once
+#include <boost/scoped_ptr.hpp>
#include <string>
#include <vector>
-#include "concurrency/spin_lock.h"
+#include "mongo/base/status.h"
namespace mongo {
@@ -28,19 +29,16 @@ namespace mongo {
* Background thread dispatching.
* subclass and define run()
*
- * It is ok to call go(), that is, run the job, more than once -- if the
- * previous invocation has finished. Thus one pattern of use is to embed
- * a backgroundjob in your object and reuse it (or same thing with
- * inheritance). Each go() call spawns a new thread.
+ * It is not possible to run the job more than once. An attempt to call 'go' while the
+ * task is running will fail. Calling 'go' after the task has finished are ignored and
+ * will not start the job again.
*
- * Thread safety:
- * note when job destructs, the thread is not terminated if still running.
- * generally if the thread could still be running, allocate the job dynamically
- * and set deleteSelf to true.
+ * Thread safety: Note that when the job destructs, the thread is not terminated if still
+ * running. Generally, if the thread could still be running, allocate the job dynamically
+ * and set deleteSelf to true.
*
- * go() and wait() are not thread safe
- * run() will be executed on the background thread
- * BackgroundJob object must exist for as long the background thread is running
+ * The overridden run() method will be executed on the background thread, so the
+ * BackgroundJob object must exist for as long the background thread is running.
*/
class BackgroundJob : boost::noncopyable {
@@ -74,7 +72,7 @@ namespace mongo {
Done
};
- virtual ~BackgroundJob() { }
+ virtual ~BackgroundJob();
/**
* starts job.
@@ -83,7 +81,17 @@ namespace mongo {
* @note the BackgroundJob object must live for as long the thread is still running, ie
* until getState() returns Done.
*/
- BackgroundJob& go();
+ void go();
+
+
+ /**
+ * If the job has not yet started, transitions the job to the 'done' state immediately,
+ * such that subsequent calls to 'go' are ignored, and notifies any waiters waiting in
+ * 'wait'. If the job has already been started, this method returns a not-ok status: it
+ * does not cancel running jobs. For this reason, you must still call 'wait' on a
+ * BackgroundJob even after calling 'cancel'.
+ */
+ Status cancel();
/**
* wait for completion.
@@ -96,18 +104,21 @@ namespace mongo {
*/
bool wait( unsigned msTimeOut = 0 );
- // accessors
+ // accessors. Note that while the access to the internal state is synchronized within
+ // these methods, there is no guarantee that the BackgroundJob is still in the
+ // indicated state after returning.
State getState() const;
bool running() const;
private:
- struct JobStatus;
- boost::shared_ptr<JobStatus> _status; // shared between 'this' and body() thread
+ const bool _selfDelete;
- void jobBody( boost::shared_ptr<JobStatus> status );
+ struct JobStatus;
+ const boost::scoped_ptr<JobStatus> _status;
+ void jobBody();
};
-
+
/**
* these run "roughly" every minute
* instantiate statically
@@ -125,31 +136,25 @@ namespace mongo {
virtual void taskDoWork() = 0;
virtual std::string taskName() const = 0;
- class Runner : public BackgroundJob {
- public:
- virtual ~Runner(){}
-
- virtual std::string name() const { return "PeriodicTask::Runner"; }
-
- virtual void run();
-
- void add( PeriodicTask* task );
- void remove( PeriodicTask* task );
-
- private:
-
- SpinLock _lock;
-
- // these are NOT owned by Runner
- // Runner will not delete these
- // this never gets smaller
- // only fields replaced with nulls
- std::vector< PeriodicTask* > _tasks;
-
- };
-
- static Runner* theRunner;
+ /**
+ * Starts the BackgroundJob that runs PeriodicTasks. You may call this multiple times,
+ * from multiple threads, and the BackgroundJob will be started only once. Please note
+ * that since this method starts threads, it is not appropriate to call it from within
+ * a mongo initializer. Calling this method after calling 'stopRunningPeriodicTasks'
+ * does not re-start the background job.
+ */
+ static void startRunningPeriodicTasks();
+ /**
+ * Waits 'gracePeriodMillis' for the BackgroundJob responsible for PeriodicTask
+ * execution to finish any running tasks, then destroys it. If the BackgroundJob was
+ * never started, returns Status::OK right away. If the BackgroundJob does not
+ * terminate within the grace period, returns an invalid status. It is safe to call
+ * this method repeatedly from one thread if the grace period is overshot. It is not
+ * safe to call this method from multiple threads, or in a way that races with
+ * 'startRunningPeriodicTasks'.
+ */
+ static Status stopRunningPeriodicTasks( int gracePeriodMillis );
};
diff --git a/src/mongo/util/background_job_test.cpp b/src/mongo/util/background_job_test.cpp
new file mode 100644
index 00000000000..f7698ae0d4a
--- /dev/null
+++ b/src/mongo/util/background_job_test.cpp
@@ -0,0 +1,90 @@
+/* Copyright 2013 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/server_options.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/background.h"
+#include "mongo/util/concurrency/mutex.h"
+#include "mongo/util/concurrency/synchronization.h"
+
+namespace mongo {
+
+ ServerGlobalParams serverGlobalParams;
+
+ bool inShutdown() {
+ return false;
+ }
+
+} // namespace mongo
+
+namespace {
+
+ using mongo::BackgroundJob;
+ using mongo::MsgAssertionException;
+ using mongo::mutex;
+ using mongo::Notification;
+
+ TEST(BackgroundJobLifeCycle, Go) {
+
+ class Job : public BackgroundJob {
+ public:
+ Job()
+ : _mutex("BackgroundJobLifeCycle::Go")
+ , _hasRun(false) {}
+
+ virtual std::string name() const {
+ return "BackgroundLifeCycle::CannotCallGoAgain";
+ }
+
+ virtual void run() {
+ {
+ mongo::scoped_lock lock( _mutex );
+ ASSERT_FALSE( _hasRun );
+ _hasRun = true;
+ }
+
+ _n.waitToBeNotified();
+ }
+
+ void notify() {
+ _n.notifyOne();
+ }
+
+ private:
+ mutex _mutex;
+ bool _hasRun;
+ Notification _n;
+ };
+
+ Job j;
+
+ // This call starts Job running.
+ j.go();
+
+ // Calling 'go' again while it is running is an error.
+ ASSERT_THROWS(j.go(), MsgAssertionException);
+
+ // Stop the Job
+ j.notify();
+ j.wait();
+
+ // Calling 'go' on a done task is a no-op. If it were not,
+ // we would fail the assert in Job::run above.
+ j.go();
+ }
+
+} // namespace
diff --git a/src/mongo/util/concurrency/msg.h b/src/mongo/util/concurrency/msg.h
index 7652d107895..303dc051a52 100644
--- a/src/mongo/util/concurrency/msg.h
+++ b/src/mongo/util/concurrency/msg.h
@@ -35,6 +35,7 @@
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
+#include "mongo/util/concurrency/mutex.h"
#include "task.h"
namespace mongo {