summaryrefslogtreecommitdiff
path: root/ceilometer/utils.py
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2016-04-22 10:14:36 +0200
committerMehdi Abaakouk <sileht@redhat.com>2016-07-04 10:40:25 +0200
commit1ab0acb851a72e4a13903dab73a20a45de98baaa (patch)
tree25b1c5b3e26cadf6ad9af9b34d4194ee06d52499 /ceilometer/utils.py
parent8c098a174518d414d1674f15f5bce6420226a887 (diff)
downloadceilometer-1ab0acb851a72e4a13903dab73a20a45de98baaa.tar.gz
pollsters: Remove eventlet timers
This change removes usage of eventlet timers. This allows coordinator heartbeat/watchers to work correctly when the main thread is stuck for any reason (IO, time.sleep, ...). This also fixes a concurrency issue in the notification-agent between stop/reload_pipeline/refresh_agent that manipulate listeners. For example a listener stopped by stop() could be restart by reload_pipeline or refresh_agent. Now we use the coord_lock to protect the listener manipulations and ensure we are not in a shutdown process when we restart it. This bug can't occurs with greenlet because we don't monkeypatch system call for a while now and all of this methods wasn't ran in concurrency manner. But remplacing greenlet by reel thread have show up the bug. Closes-Bug: #1582641 Change-Id: I21c3b953a296316b983114435fcbeba1e29f051e
Diffstat (limited to 'ceilometer/utils.py')
-rw-r--r--ceilometer/utils.py17
1 files changed, 17 insertions, 0 deletions
diff --git a/ceilometer/utils.py b/ceilometer/utils.py
index a4495da7..544d5523 100644
--- a/ceilometer/utils.py
+++ b/ceilometer/utils.py
@@ -26,7 +26,10 @@ import decimal
import hashlib
import struct
import threading
+import time
+from concurrent import futures
+from futurist import periodics
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import timeutils
@@ -257,8 +260,22 @@ def kill_listeners(listeners):
listener.wait()
+def delayed(delay, target, *args, **kwargs):
+ time.sleep(delay)
+ return target(*args, **kwargs)
+
+
def spawn_thread(target, *args, **kwargs):
t = threading.Thread(target=target, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return t
+
+
+def create_periodic(target, spacing, run_immediately=True, *args, **kwargs):
+ p = periodics.PeriodicWorker.create(
+ [], executor_factory=lambda: futures.ThreadPoolExecutor(max_workers=1))
+ p.add(periodics.periodic(
+ spacing=spacing, run_immediately=run_immediately)(
+ lambda: target(*args, **kwargs)))
+ return p