summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoragronholm <devnull@localhost>2011-04-16 08:55:44 +0300
committeragronholm <devnull@localhost>2011-04-16 08:55:44 +0300
commit12eecb7fcf981b6206e17fae2965d45cdbbabcc7 (patch)
tree3297535ed958dfcb4b68da5372b947fa33036f12
parentd10a4a57eef0bba26caf04b67d06970d8706139e (diff)
downloadapscheduler-12eecb7fcf981b6206e17fae2965d45cdbbabcc7.tar.gz
Fixed thread pool logic and tests; improved documentation on thread pool optionsv2.0.0rc1
-rw-r--r--apscheduler/threadpool.py16
-rw-r--r--docs/index.rst64
-rw-r--r--tests/testintegration.py30
-rw-r--r--tests/testthreadpool.py7
4 files changed, 63 insertions, 54 deletions
diff --git a/apscheduler/threadpool.py b/apscheduler/threadpool.py
index 8b87260..8ec47da 100644
--- a/apscheduler/threadpool.py
+++ b/apscheduler/threadpool.py
@@ -31,7 +31,7 @@ atexit.register(_shutdown_all)
class ThreadPool(object):
- def __init__(self, core_threads=0, max_threads=None, keepalive=1):
+ def __init__(self, core_threads=0, max_threads=20, keepalive=1):
"""
:param core_threads: maximum number of persistent threads in the pool
:param max_threads: maximum number of total threads in the pool
@@ -40,16 +40,13 @@ class ThreadPool(object):
for new tasks
"""
self.core_threads = core_threads
- self.max_threads = max_threads
+ self.max_threads = max(max_threads, core_threads, 1)
self.keepalive = keepalive
self._queue = Queue()
self._threads_lock = Lock()
self._threads = set()
self._shutdown = False
- if max_threads is not None:
- self.max_threads = max(max_threads, core_threads, 1)
-
_threadpools.add(ref(self))
logger.info('Started thread pool with %d core threads and %s maximum '
'threads', core_threads, max_threads or 'unlimited')
@@ -57,13 +54,8 @@ class ThreadPool(object):
def _adjust_threadcount(self):
self._threads_lock.acquire()
try:
- qsize = self._queue.qsize()
- if self.num_threads < self.core_threads:
- self._add_thread(True)
- elif qsize > 1 and self.num_threads < self.max_threads:
- self._add_thread(False)
- elif self.num_threads == 0:
- self._add_thread(False)
+ if self.num_threads < self.max_threads:
+ self._add_thread(self.num_threads < self.core_threads)
finally:
self._threads_lock.release()
diff --git a/docs/index.rst b/docs/index.rst
index aa225e0..a005f2c 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -146,37 +146,39 @@ tasks to finish without shutting down the thread pool.
Scheduler configuration options
-------------------------------
-======================= ======== ==============================================
-Directive Default Definition
-======================= ======== ==============================================
-misfire_grace_time 1 Maximum time in seconds for the job execution
- to be allowed to delay before it is considered
- a misfire
-coalesce False Roll several pending executions of jobs into one
-daemonic True Controls whether the scheduler thread is
- daemonic or not.
-
- If set to ``False``, then the
- scheduler must be shut down explicitly
- when the program is about to finish, or it will
- prevent the program from terminating.
-
- If set to ``True``, the scheduler will
- automatically terminate with the application,
- but may cause an exception to be raised on
- exit.
-
- Jobs are always executed in non-daemonic
- threads.
-threadpool
-threadpool.core_threads 0 Maximum number of persistent threads in the pool
-threadpool.max_threads None Maximum number of total threads in the pool
-threadpool.keepalive 1 Seconds to keep non-core worker threads waiting
- for new tasks
-jobstores.X.class Class of the jobstore named X (specified as
- module.name:classname)
-jobstores.X.Y Constructor option Y of jobstore X
-======================= ======== ==============================================
+======================= ========== ==============================================
+Directive Default Definition
+======================= ========== ==============================================
+misfire_grace_time 1 Maximum time in seconds for the job execution
+ to be allowed to delay before it is considered
+ a misfire
+coalesce False Roll several pending executions of jobs into one
+daemonic True Controls whether the scheduler thread is
+ daemonic or not.
+
+ If set to ``False``, then the
+ scheduler must be shut down explicitly
+ when the program is about to finish, or it will
+ prevent the program from terminating.
+
+ If set to ``True``, the scheduler will
+ automatically terminate with the application,
+ but may cause an exception to be raised on
+ exit.
+
+ Jobs are always executed in non-daemonic
+ threads.
+threadpool (built-in) Instance of a :pep:`3148` compliant thread
+ pool or a dot-notation (``x.y.z:varname``)
+ reference to one
+threadpool.core_threads 0 Maximum number of persistent threads in the pool
+threadpool.max_threads 20 Maximum number of total threads in the pool
+threadpool.keepalive 1 Seconds to keep non-core worker threads waiting
+ for new tasks
+jobstores.X.class Class of the jobstore named X (specified as
+ module.name:classname)
+jobstores.X.Y Constructor option Y of jobstore X
+======================= ========== ==============================================
Job stores
diff --git a/tests/testintegration.py b/tests/testintegration.py
index 318aebc..ef2468e 100644
--- a/tests/testintegration.py
+++ b/tests/testintegration.py
@@ -25,9 +25,9 @@ except ImportError:
MongoDBJobStore = None
-def increment(vals):
+def increment(vals, sleeptime):
vals[0] += 1
- sleep(2)
+ sleep(sleeptime)
class IntegrationTestBase(object):
@@ -43,7 +43,7 @@ class IntegrationTestBase(object):
vals = [0]
self.scheduler.add_interval_job(increment, jobstore='persistent',
- seconds=1, args=[vals])
+ seconds=1, args=[vals, 2])
sleep(2.5)
eq_(vals, [1])
@@ -53,12 +53,14 @@ class IntegrationTestBase(object):
self.scheduler.add_listener(events.append,
EVENT_JOB_EXECUTED | EVENT_JOB_MISSED)
self.scheduler.add_interval_job(increment, jobstore='persistent',
- seconds=0.3, max_instances=2, max_runs=4, args=[vals])
- sleep(1.5)
+ seconds=0.3, max_instances=2, max_runs=4, args=[vals, 1])
+ sleep(2.4)
eq_(vals, [2])
- eq_(events[0].code, EVENT_JOB_EXECUTED)
- eq_(events[1].code, EVENT_JOB_EXECUTED)
- eq_(events[2].code, EVENT_JOB_MISSED)
+ eq_(len(events), 4)
+ eq_(events[0].code, EVENT_JOB_MISSED)
+ eq_(events[1].code, EVENT_JOB_MISSED)
+ eq_(events[2].code, EVENT_JOB_EXECUTED)
+ eq_(events[3].code, EVENT_JOB_EXECUTED)
class TestShelveIntegration(IntegrationTestBase):
@@ -77,6 +79,10 @@ class TestShelveIntegration(IntegrationTestBase):
"""Shelve/test_overlapping_runs"""
IntegrationTestBase.test_overlapping_runs(self)
+ def test_max_instances(self):
+ """Shelve/test_max_instances"""
+ IntegrationTestBase.test_max_instances(self)
+
def teardown(self):
self.scheduler.shutdown()
self.jobstore.close()
@@ -96,6 +102,10 @@ class TestSQLAlchemyIntegration(IntegrationTestBase):
"""SQLAlchemy/test_overlapping_runs"""
IntegrationTestBase.test_overlapping_runs(self)
+ def test_max_instances(self):
+ """SQLAlchemy/test_max_instances"""
+ IntegrationTestBase.test_max_instances(self)
+
def teardown(self):
self.scheduler.shutdown()
self.jobstore.close()
@@ -115,6 +125,10 @@ class TestMongoDBIntegration(IntegrationTestBase):
"""MongoDB/test_overlapping_runs"""
IntegrationTestBase.test_overlapping_runs(self)
+ def test_max_instances(self):
+ """SQLAlchemy/test_max_instances"""
+ IntegrationTestBase.test_max_instances(self)
+
def teardown(self):
self.scheduler.shutdown()
connection = self.jobstore.collection.database.connection
diff --git a/tests/testthreadpool.py b/tests/testthreadpool.py
index 52b4fdb..5010c6b 100644
--- a/tests/testthreadpool.py
+++ b/tests/testthreadpool.py
@@ -20,10 +20,11 @@ def test_threadpool():
assert event1.isSet()
assert event2.isSet()
assert event3.isSet()
- eq_(repr(pool), '<ThreadPool at %x; threads=2>' % id(pool))
+ sleep(0.3)
+ eq_(repr(pool), '<ThreadPool at %x; threads=2/20>' % id(pool))
pool.shutdown()
- eq_(repr(pool), '<ThreadPool at %x; threads=0>' % id(pool))
+ eq_(repr(pool), '<ThreadPool at %x; threads=0/20>' % id(pool))
# Make sure double shutdown is ok
pool.shutdown()
@@ -50,4 +51,4 @@ def test_threadpool_nocore():
event.wait(1)
assert event.isSet()
sleep(1)
- eq_(repr(pool), '<ThreadPool at %x; threads=0>' % id(pool))
+ eq_(repr(pool), '<ThreadPool at %x; threads=0/20>' % id(pool))