summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Eckersberg <jeckersb@redhat.com>2016-10-14 11:02:47 -0400
committerJohn Eckersberg <jeckersb@redhat.com>2016-11-16 15:06:30 -0500
commitd690cac08ca0f6c18518e7760d8bdcfd14cdee84 (patch)
tree05d9abc631a9c51e8cff1b4dbfd09775dc058c6d
parentb63ab206c028af0f16800401e99f80ccc9d573b6 (diff)
downloadoslo-messaging-d690cac08ca0f6c18518e7760d8bdcfd14cdee84.tar.gz
rabbit: Avoid busy loop on epoll_wait with heartbeat+eventlet
Calling threading.Event.wait() when using eventlet results in a busy loop calling epoll_wait, because the Python 2.x threading.Condition.wait() implementation busy-waits by calling sleep() with very small values (0.0005..0.05s). Because sleep() is monkey-patched by eventlet, this results in many very short timers being added to the eventlet hub, and forces eventlet to constantly epoll_wait looking for new data unecessarily. This utilizes a new Event from eventletutils which conditionalizes the event primitive depending on whether or not eventlet is being used. If it is, eventlet.event.Event is used instead of threading.Event. The eventlet.event.Event implementation does not suffer from the same busy-wait sleep problem. If eventlet is not used, the previous behavior is retained. For Newton backport, this bundles the Event from eventletutils directly in oslo.messaging under the _utils module. It is taken from: https://review.openstack.org/#/c/389739/ combined with the followup fix: https://review.openstack.org/#/c/394460/ Change-Id: I5c211092d282e724d1c87ce4d06b6c44b592e764 Depends-On: Id33c9f8c17102ba1fe24c12b053c336b6d265501 Closes-bug: #1518430 (cherry picked from commit a6c193f3eba62cdcbfe04d0fa93e95352bcfb1c3)
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py2
-rw-r--r--oslo_messaging/_utils.py51
-rw-r--r--oslo_messaging/tests/test_utils.py28
3 files changed, 80 insertions, 1 deletions
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 60461d7..65032c2 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -912,7 +912,7 @@ class Connection(object):
def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled():
- self._heartbeat_exit_event = threading.Event()
+ self._heartbeat_exit_event = _utils.Event()
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_thread_job)
self._heartbeat_thread.daemon = True
diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py
index e0025e6..919a44b 100644
--- a/oslo_messaging/_utils.py
+++ b/oslo_messaging/_utils.py
@@ -15,6 +15,11 @@
import threading
+from oslo_utils import importutils
+
+_eventlet = importutils.try_import('eventlet')
+_patcher = importutils.try_import('eventlet.patcher')
+
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
@@ -74,3 +79,49 @@ class DummyLock(object):
def __exit__(self, type, value, traceback):
self.release()
+
+
+class _Event(object):
+ """A class that provides consistent eventlet/threading Event API.
+
+ This wraps the eventlet.event.Event class to have the same API as
+ the standard threading.Event object.
+ """
+ def __init__(self, *args, **kwargs):
+ self.clear()
+
+ def clear(self):
+ self._set = False
+ self._event = _eventlet.event.Event()
+
+ def is_set(self):
+ return self._set
+
+ isSet = is_set
+
+ def set(self):
+ self._set = True
+ self._event.send(True)
+
+ def wait(self, timeout=None):
+ with _eventlet.timeout.Timeout(timeout, False):
+ self._event.wait()
+ return self.is_set()
+
+
+def _is_monkey_patched(module):
+ """Determines safely is eventlet patching for module enabled or not
+ :param module: String, module name
+ :return Bool, True if module is patched, False otherwise
+ """
+
+ if _patcher is None:
+ return False
+ return _patcher.is_monkey_patched(module)
+
+
+def Event():
+ if _is_monkey_patched("thread"):
+ return _Event()
+ else:
+ return threading.Event()
diff --git a/oslo_messaging/tests/test_utils.py b/oslo_messaging/tests/test_utils.py
index 908c25f..256a694 100644
--- a/oslo_messaging/tests/test_utils.py
+++ b/oslo_messaging/tests/test_utils.py
@@ -13,9 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
+import threading
+
from oslo_messaging._drivers import common
from oslo_messaging import _utils as utils
from oslo_messaging.tests import utils as test_utils
+
+import six
from six.moves import mock
@@ -97,3 +101,27 @@ class TimerTestCase(test_utils.BaseTestCase):
remaining = t.check_return(callback, 1, a='b')
self.assertEqual(0, remaining)
callback.assert_called_once_with(1, a='b')
+
+
+class EventCompatTestCase(test_utils.BaseTestCase):
+ @mock.patch('oslo_messaging._utils._Event.clear')
+ def test_event_api_compat(self, mock_clear):
+ with mock.patch('oslo_messaging._utils._is_monkey_patched',
+ return_value=True):
+ e_event = utils.Event()
+ self.assertIsInstance(e_event, utils._Event)
+
+ with mock.patch('oslo_messaging._utils._is_monkey_patched',
+ return_value=False):
+ t_event = utils.Event()
+ if six.PY3:
+ t_event_cls = threading.Event
+ else:
+ t_event_cls = threading._Event
+ self.assertIsInstance(t_event, t_event_cls)
+
+ public_methods = [m for m in dir(t_event) if not m.startswith("_") and
+ callable(getattr(t_event, m))]
+
+ for method in public_methods:
+ self.assertTrue(hasattr(e_event, method))