diff options
author | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-08-07 15:23:41 +0300 |
---|---|---|
committer | Victor Sergeyev <vsergeyev@mirantis.com> | 2015-08-07 16:56:33 +0300 |
commit | c5a6bfdca30a5111e641ebe4b2eac40b21b8ce74 (patch) | |
tree | b2d6bae0313e5750556b1586d0892400e78996a8 | |
parent | da4ee6361baeaf34ce3bfe24b53a2fc371ae9e75 (diff) | |
download | oslo-messaging-c5a6bfdca30a5111e641ebe4b2eac40b21b8ce74.tar.gz |
FIx CPU time consuming in green_poller poll()
The current implementation of GreenPoller.poll() calls
eventlet.sleep() in `while True:` loop. It causes high CPU load,
so should be refactored to use queue.get() with timeout.
Change-Id: I48f1d8db39c4d7df8bd7f0bc9898ebefcd8df9e8
-rw-r--r-- | oslo_messaging/_drivers/zmq_driver/poller/green_poller.py | 17 |
1 files changed, 4 insertions, 13 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py index dcf9da5..58f8d8a 100644 --- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py +++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py @@ -16,9 +16,7 @@ import logging import threading import eventlet -import six -from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_poller LOG = logging.getLogger(__name__) @@ -27,7 +25,7 @@ LOG = logging.getLogger(__name__) class GreenPoller(zmq_poller.ZmqPoller): def __init__(self): - self.incoming_queue = six.moves.queue.Queue() + self.incoming_queue = eventlet.queue.LightQueue() self.green_pool = eventlet.GreenPool() self.thread_by_socket = {} @@ -46,17 +44,10 @@ class GreenPoller(zmq_poller.ZmqPoller): eventlet.sleep() def poll(self, timeout=None): - incoming = None try: - with eventlet.Timeout(timeout, exception=rpc_common.Timeout): - while incoming is None: - try: - incoming = self.incoming_queue.get_nowait() - except six.moves.queue.Empty: - eventlet.sleep() - except rpc_common.Timeout: - return None, None - return incoming[0], incoming[1] + return self.incoming_queue.get(timeout=timeout) + except eventlet.queue.Empty: + return (None, None) def close(self): for thread in self.thread_by_socket.values(): |