diff options
author | agronholm <devnull@localhost> | 2011-04-16 08:55:44 +0300 |
---|---|---|
committer | agronholm <devnull@localhost> | 2011-04-16 08:55:44 +0300 |
commit | 12eecb7fcf981b6206e17fae2965d45cdbbabcc7 (patch) | |
tree | 3297535ed958dfcb4b68da5372b947fa33036f12 | |
parent | d10a4a57eef0bba26caf04b67d06970d8706139e (diff) | |
download | apscheduler-12eecb7fcf981b6206e17fae2965d45cdbbabcc7.tar.gz |
Fixed thread pool logic and tests; improved documentation on thread pool optionsv2.0.0rc1
-rw-r--r-- | apscheduler/threadpool.py | 16 | ||||
-rw-r--r-- | docs/index.rst | 64 | ||||
-rw-r--r-- | tests/testintegration.py | 30 | ||||
-rw-r--r-- | tests/testthreadpool.py | 7 |
4 files changed, 63 insertions, 54 deletions
diff --git a/apscheduler/threadpool.py b/apscheduler/threadpool.py index 8b87260..8ec47da 100644 --- a/apscheduler/threadpool.py +++ b/apscheduler/threadpool.py @@ -31,7 +31,7 @@ atexit.register(_shutdown_all) class ThreadPool(object): - def __init__(self, core_threads=0, max_threads=None, keepalive=1): + def __init__(self, core_threads=0, max_threads=20, keepalive=1): """ :param core_threads: maximum number of persistent threads in the pool :param max_threads: maximum number of total threads in the pool @@ -40,16 +40,13 @@ class ThreadPool(object): for new tasks """ self.core_threads = core_threads - self.max_threads = max_threads + self.max_threads = max(max_threads, core_threads, 1) self.keepalive = keepalive self._queue = Queue() self._threads_lock = Lock() self._threads = set() self._shutdown = False - if max_threads is not None: - self.max_threads = max(max_threads, core_threads, 1) - _threadpools.add(ref(self)) logger.info('Started thread pool with %d core threads and %s maximum ' 'threads', core_threads, max_threads or 'unlimited') @@ -57,13 +54,8 @@ class ThreadPool(object): def _adjust_threadcount(self): self._threads_lock.acquire() try: - qsize = self._queue.qsize() - if self.num_threads < self.core_threads: - self._add_thread(True) - elif qsize > 1 and self.num_threads < self.max_threads: - self._add_thread(False) - elif self.num_threads == 0: - self._add_thread(False) + if self.num_threads < self.max_threads: + self._add_thread(self.num_threads < self.core_threads) finally: self._threads_lock.release() diff --git a/docs/index.rst b/docs/index.rst index aa225e0..a005f2c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -146,37 +146,39 @@ tasks to finish without shutting down the thread pool. Scheduler configuration options ------------------------------- -======================= ======== ============================================== -Directive Default Definition -======================= ======== ============================================== -misfire_grace_time 1 Maximum time in seconds for the job execution - to be allowed to delay before it is considered - a misfire -coalesce False Roll several pending executions of jobs into one -daemonic True Controls whether the scheduler thread is - daemonic or not. - - If set to ``False``, then the - scheduler must be shut down explicitly - when the program is about to finish, or it will - prevent the program from terminating. - - If set to ``True``, the scheduler will - automatically terminate with the application, - but may cause an exception to be raised on - exit. - - Jobs are always executed in non-daemonic - threads. -threadpool -threadpool.core_threads 0 Maximum number of persistent threads in the pool -threadpool.max_threads None Maximum number of total threads in the pool -threadpool.keepalive 1 Seconds to keep non-core worker threads waiting - for new tasks -jobstores.X.class Class of the jobstore named X (specified as - module.name:classname) -jobstores.X.Y Constructor option Y of jobstore X -======================= ======== ============================================== +======================= ========== ============================================== +Directive Default Definition +======================= ========== ============================================== +misfire_grace_time 1 Maximum time in seconds for the job execution + to be allowed to delay before it is considered + a misfire +coalesce False Roll several pending executions of jobs into one +daemonic True Controls whether the scheduler thread is + daemonic or not. + + If set to ``False``, then the + scheduler must be shut down explicitly + when the program is about to finish, or it will + prevent the program from terminating. + + If set to ``True``, the scheduler will + automatically terminate with the application, + but may cause an exception to be raised on + exit. + + Jobs are always executed in non-daemonic + threads. +threadpool (built-in) Instance of a :pep:`3148` compliant thread + pool or a dot-notation (``x.y.z:varname``) + reference to one +threadpool.core_threads 0 Maximum number of persistent threads in the pool +threadpool.max_threads 20 Maximum number of total threads in the pool +threadpool.keepalive 1 Seconds to keep non-core worker threads waiting + for new tasks +jobstores.X.class Class of the jobstore named X (specified as + module.name:classname) +jobstores.X.Y Constructor option Y of jobstore X +======================= ========== ============================================== Job stores diff --git a/tests/testintegration.py b/tests/testintegration.py index 318aebc..ef2468e 100644 --- a/tests/testintegration.py +++ b/tests/testintegration.py @@ -25,9 +25,9 @@ except ImportError: MongoDBJobStore = None -def increment(vals): +def increment(vals, sleeptime): vals[0] += 1 - sleep(2) + sleep(sleeptime) class IntegrationTestBase(object): @@ -43,7 +43,7 @@ class IntegrationTestBase(object): vals = [0] self.scheduler.add_interval_job(increment, jobstore='persistent', - seconds=1, args=[vals]) + seconds=1, args=[vals, 2]) sleep(2.5) eq_(vals, [1]) @@ -53,12 +53,14 @@ class IntegrationTestBase(object): self.scheduler.add_listener(events.append, EVENT_JOB_EXECUTED | EVENT_JOB_MISSED) self.scheduler.add_interval_job(increment, jobstore='persistent', - seconds=0.3, max_instances=2, max_runs=4, args=[vals]) - sleep(1.5) + seconds=0.3, max_instances=2, max_runs=4, args=[vals, 1]) + sleep(2.4) eq_(vals, [2]) - eq_(events[0].code, EVENT_JOB_EXECUTED) - eq_(events[1].code, EVENT_JOB_EXECUTED) - eq_(events[2].code, EVENT_JOB_MISSED) + eq_(len(events), 4) + eq_(events[0].code, EVENT_JOB_MISSED) + eq_(events[1].code, EVENT_JOB_MISSED) + eq_(events[2].code, EVENT_JOB_EXECUTED) + eq_(events[3].code, EVENT_JOB_EXECUTED) class TestShelveIntegration(IntegrationTestBase): @@ -77,6 +79,10 @@ class TestShelveIntegration(IntegrationTestBase): """Shelve/test_overlapping_runs""" IntegrationTestBase.test_overlapping_runs(self) + def test_max_instances(self): + """Shelve/test_max_instances""" + IntegrationTestBase.test_max_instances(self) + def teardown(self): self.scheduler.shutdown() self.jobstore.close() @@ -96,6 +102,10 @@ class TestSQLAlchemyIntegration(IntegrationTestBase): """SQLAlchemy/test_overlapping_runs""" IntegrationTestBase.test_overlapping_runs(self) + def test_max_instances(self): + """SQLAlchemy/test_max_instances""" + IntegrationTestBase.test_max_instances(self) + def teardown(self): self.scheduler.shutdown() self.jobstore.close() @@ -115,6 +125,10 @@ class TestMongoDBIntegration(IntegrationTestBase): """MongoDB/test_overlapping_runs""" IntegrationTestBase.test_overlapping_runs(self) + def test_max_instances(self): + """SQLAlchemy/test_max_instances""" + IntegrationTestBase.test_max_instances(self) + def teardown(self): self.scheduler.shutdown() connection = self.jobstore.collection.database.connection diff --git a/tests/testthreadpool.py b/tests/testthreadpool.py index 52b4fdb..5010c6b 100644 --- a/tests/testthreadpool.py +++ b/tests/testthreadpool.py @@ -20,10 +20,11 @@ def test_threadpool(): assert event1.isSet() assert event2.isSet() assert event3.isSet() - eq_(repr(pool), '<ThreadPool at %x; threads=2>' % id(pool)) + sleep(0.3) + eq_(repr(pool), '<ThreadPool at %x; threads=2/20>' % id(pool)) pool.shutdown() - eq_(repr(pool), '<ThreadPool at %x; threads=0>' % id(pool)) + eq_(repr(pool), '<ThreadPool at %x; threads=0/20>' % id(pool)) # Make sure double shutdown is ok pool.shutdown() @@ -50,4 +51,4 @@ def test_threadpool_nocore(): event.wait(1) assert event.isSet() sleep(1) - eq_(repr(pool), '<ThreadPool at %x; threads=0>' % id(pool)) + eq_(repr(pool), '<ThreadPool at %x; threads=0/20>' % id(pool)) |