diff options
author | Mehdi Abaakouk <sileht@redhat.com> | 2016-04-22 10:14:36 +0200 |
---|---|---|
committer | Mehdi Abaakouk <sileht@redhat.com> | 2016-07-04 10:40:25 +0200 |
commit | 1ab0acb851a72e4a13903dab73a20a45de98baaa (patch) | |
tree | 25b1c5b3e26cadf6ad9af9b34d4194ee06d52499 /ceilometer/utils.py | |
parent | 8c098a174518d414d1674f15f5bce6420226a887 (diff) | |
download | ceilometer-1ab0acb851a72e4a13903dab73a20a45de98baaa.tar.gz |
pollsters: Remove eventlet timers
This change removes usage of eventlet timers.
This allows coordinator heartbeat/watchers to work correctly
when the main thread is stuck for any reason (IO, time.sleep, ...).
This also fixes a concurrency issue in the notification-agent between
stop/reload_pipeline/refresh_agent that manipulate listeners.
For example a listener stopped by stop() could be restart by
reload_pipeline or refresh_agent. Now we use the coord_lock to
protect the listener manipulations and ensure we are not in a shutdown
process when we restart it.
This bug can't occurs with greenlet because we don't monkeypatch system
call for a while now and all of this methods wasn't ran in concurrency
manner. But remplacing greenlet by reel thread have show up the bug.
Closes-Bug: #1582641
Change-Id: I21c3b953a296316b983114435fcbeba1e29f051e
Diffstat (limited to 'ceilometer/utils.py')
-rw-r--r-- | ceilometer/utils.py | 17 |
1 files changed, 17 insertions, 0 deletions
diff --git a/ceilometer/utils.py b/ceilometer/utils.py index a4495da7..544d5523 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -26,7 +26,10 @@ import decimal import hashlib import struct import threading +import time +from concurrent import futures +from futurist import periodics from oslo_concurrency import processutils from oslo_config import cfg from oslo_utils import timeutils @@ -257,8 +260,22 @@ def kill_listeners(listeners): listener.wait() +def delayed(delay, target, *args, **kwargs): + time.sleep(delay) + return target(*args, **kwargs) + + def spawn_thread(target, *args, **kwargs): t = threading.Thread(target=target, args=args, kwargs=kwargs) t.daemon = True t.start() return t + + +def create_periodic(target, spacing, run_immediately=True, *args, **kwargs): + p = periodics.PeriodicWorker.create( + [], executor_factory=lambda: futures.ThreadPoolExecutor(max_workers=1)) + p.add(periodics.periodic( + spacing=spacing, run_immediately=run_immediately)( + lambda: target(*args, **kwargs))) + return p |