summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJulien Danjou <julien@danjou.info>2015-11-02 18:51:13 +0100
committerJulien Danjou <julien@danjou.info>2015-11-24 13:44:58 +0100
commit867ad8d6ab0b8b5392c458732265ccb86b039d88 (patch)
tree6ce8f7e77934946f809c7288950e6c4b1754fd22
parent6bc86f75ea881e5660413a9e2d67d91c1a16f8c7 (diff)
downloadceilometer-867ad8d6ab0b8b5392c458732265ccb86b039d88.tar.gz
Remove eventlet usage
This removes entirely our usage of eventlet and its ugly monkey-patching in favor of a threaded approach. Implements: remove-eventlet Change-Id: Ib5f623e2d1ff9e9254601ad091bf5b53ab32000d
-rw-r--r--ceilometer/__init__.py9
-rw-r--r--ceilometer/cmd/agent_notification.py (renamed from ceilometer/cmd/eventlet/agent_notification.py)0
-rw-r--r--ceilometer/cmd/collector.py (renamed from ceilometer/cmd/eventlet/collector.py)0
-rw-r--r--ceilometer/cmd/eventlet/__init__.py22
-rw-r--r--ceilometer/cmd/polling.py (renamed from ceilometer/cmd/eventlet/polling.py)0
-rw-r--r--ceilometer/cmd/sample.py (renamed from ceilometer/cmd/eventlet/sample.py)0
-rw-r--r--ceilometer/cmd/storage.py (renamed from ceilometer/cmd/eventlet/storage.py)0
-rw-r--r--ceilometer/compute/virt/xenapi/inspector.py9
-rw-r--r--ceilometer/messaging.py4
-rw-r--r--ceilometer/opts.py4
-rw-r--r--ceilometer/publisher/messaging.py1
-rw-r--r--ceilometer/tests/base.py5
-rw-r--r--ceilometer/tests/functional/test_notification.py93
-rw-r--r--ceilometer/tests/unit/agent/test_manager.py29
-rw-r--r--ceilometer/tests/unit/publisher/test_messaging_publisher.py28
-rw-r--r--requirements.txt1
-rw-r--r--setup.cfg12
-rw-r--r--tox.ini4
18 files changed, 25 insertions, 196 deletions
diff --git a/ceilometer/__init__.py b/ceilometer/__init__.py
index 8753fef0..676c802f 100644
--- a/ceilometer/__init__.py
+++ b/ceilometer/__init__.py
@@ -1,7 +1,5 @@
# Copyright 2014 eNovance
#
-# Authors: Julien Danjou <julien@danjou.info>
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -14,13 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-# This must be set before the initial import of eventlet because if
-# dnspython is present in your environment then eventlet monkeypatches
-# socket.getaddrinfo() with an implementation which doesn't work for IPv6.
-import os
-
-os.environ['EVENTLET_NO_GREENDNS'] = 'yes'
-
class NotImplementedError(NotImplementedError):
# FIXME(jd) This is used by WSME to return a correct HTTP code. We should
diff --git a/ceilometer/cmd/eventlet/agent_notification.py b/ceilometer/cmd/agent_notification.py
index 08b16464..08b16464 100644
--- a/ceilometer/cmd/eventlet/agent_notification.py
+++ b/ceilometer/cmd/agent_notification.py
diff --git a/ceilometer/cmd/eventlet/collector.py b/ceilometer/cmd/collector.py
index 0a56a7f5..0a56a7f5 100644
--- a/ceilometer/cmd/eventlet/collector.py
+++ b/ceilometer/cmd/collector.py
diff --git a/ceilometer/cmd/eventlet/__init__.py b/ceilometer/cmd/eventlet/__init__.py
deleted file mode 100644
index 99efcc46..00000000
--- a/ceilometer/cmd/eventlet/__init__.py
+++ /dev/null
@@ -1,22 +0,0 @@
-# -*- encoding: utf-8 -*-
-#
-# Copyright 2014 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import eventlet
-# NOTE(jd) We need to monkey patch the socket and select module for,
-# at least, oslo.messaging, otherwise everything's blocked on its
-# first read() or select(), thread need to be patched too, because
-# oslo.messaging use threading.local
-eventlet.monkey_patch(socket=True, select=True, thread=True, time=True)
diff --git a/ceilometer/cmd/eventlet/polling.py b/ceilometer/cmd/polling.py
index 1055711e..1055711e 100644
--- a/ceilometer/cmd/eventlet/polling.py
+++ b/ceilometer/cmd/polling.py
diff --git a/ceilometer/cmd/eventlet/sample.py b/ceilometer/cmd/sample.py
index 6f5794a7..6f5794a7 100644
--- a/ceilometer/cmd/eventlet/sample.py
+++ b/ceilometer/cmd/sample.py
diff --git a/ceilometer/cmd/eventlet/storage.py b/ceilometer/cmd/storage.py
index 752ea63f..752ea63f 100644
--- a/ceilometer/cmd/eventlet/storage.py
+++ b/ceilometer/cmd/storage.py
diff --git a/ceilometer/compute/virt/xenapi/inspector.py b/ceilometer/compute/virt/xenapi/inspector.py
index 28e5afef..02d69ed0 100644
--- a/ceilometer/compute/virt/xenapi/inspector.py
+++ b/ceilometer/compute/virt/xenapi/inspector.py
@@ -13,7 +13,6 @@
# under the License.
"""Implementation of Inspector abstraction for XenAPI."""
-from eventlet import timeout
from oslo_config import cfg
from oslo_utils import units
try:
@@ -38,9 +37,6 @@ OPTS = [
cfg.StrOpt('connection_password',
help='Password for connection to XenServer/Xen Cloud Platform.',
secret=True),
- cfg.IntOpt('login_timeout',
- default=10,
- help='Timeout in seconds for XenAPI login.'),
]
CONF = cfg.CONF
@@ -63,13 +59,10 @@ def get_api_session():
raise XenapiException(_('Must specify connection_url, and '
'connection_password to use'))
- exception = api.Failure(_("Unable to log in to XenAPI "
- "(is the Dom0 disk full?)"))
try:
session = (api.xapi_local() if url == 'unix://local'
else api.Session(url))
- with timeout.Timeout(CONF.xenapi.login_timeout, exception):
- session.login_with_password(username, password)
+ session.login_with_password(username, password)
except api.Failure as e:
msg = _("Could not connect to XenAPI: %s") % e.details[0]
raise XenapiException(msg)
diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py
index 6f5dce5d..5e7ea318 100644
--- a/ceilometer/messaging.py
+++ b/ceilometer/messaging.py
@@ -61,7 +61,7 @@ def get_rpc_server(transport, topic, endpoint):
serializer = oslo_serializer.RequestContextSerializer(
oslo_serializer.JsonPayloadSerializer())
return oslo_messaging.get_rpc_server(transport, target,
- [endpoint], executor='eventlet',
+ [endpoint], executor='threading',
serializer=serializer)
@@ -79,7 +79,7 @@ def get_notification_listener(transport, targets, endpoints,
allow_requeue=False):
"""Return a configured oslo_messaging notification listener."""
return oslo_messaging.get_notification_listener(
- transport, targets, endpoints, executor='eventlet',
+ transport, targets, endpoints, executor='threading',
allow_requeue=allow_requeue)
diff --git a/ceilometer/opts.py b/ceilometer/opts.py
index 971ea83e..7eb94da3 100644
--- a/ceilometer/opts.py
+++ b/ceilometer/opts.py
@@ -16,7 +16,7 @@ import itertools
import ceilometer.agent.manager
import ceilometer.api
import ceilometer.api.app
-import ceilometer.cmd.eventlet.polling
+import ceilometer.cmd.polling
import ceilometer.collector
import ceilometer.compute.discovery
import ceilometer.compute.notifications
@@ -57,7 +57,7 @@ def list_opts():
('DEFAULT',
itertools.chain(ceilometer.agent.manager.OPTS,
ceilometer.api.app.OPTS,
- ceilometer.cmd.eventlet.polling.CLI_OPTS,
+ ceilometer.cmd.polling.CLI_OPTS,
ceilometer.compute.notifications.OPTS,
ceilometer.compute.util.OPTS,
ceilometer.compute.virt.inspector.OPTS,
diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py
index a44b6ad1..5ca95d98 100644
--- a/ceilometer/publisher/messaging.py
+++ b/ceilometer/publisher/messaging.py
@@ -139,7 +139,6 @@ class MessagingPublisher(publisher.PublisherBase):
def flush(self):
# NOTE(sileht):
- # IO of the rpc stuff in handled by eventlet,
# this is why the self.local_queue, is emptied before processing the
# queue and the remaining messages in the queue are added to
# self.local_queue after in case of a other call have already added
diff --git a/ceilometer/tests/base.py b/ceilometer/tests/base.py
index 72f45500..4def20be 100644
--- a/ceilometer/tests/base.py
+++ b/ceilometer/tests/base.py
@@ -18,7 +18,6 @@
import functools
import os.path
-import eventlet
import oslo_messaging.conffixture
from oslo_utils import timeutils
from oslotest import base
@@ -39,10 +38,6 @@ class BaseTestCase(base.BaseTestCase):
exchange = 'ceilometer'
conf.set_override("control_exchange", exchange)
- # oslo.messaging fake driver needs time and thread
- # to be patched, otherwise there are chances of deadlocks
- eventlet.monkey_patch(time=True, thread=True)
-
# NOTE(sileht): Ensure a new oslo.messaging driver is loaded
# between each tests
self.transport = messaging.get_transport("fake://", cache=False)
diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py
index c0ebe6b5..52e0244a 100644
--- a/ceilometer/tests/functional/test_notification.py
+++ b/ceilometer/tests/functional/test_notification.py
@@ -16,7 +16,6 @@
import shutil
-import eventlet
import mock
from oslo_config import fixture as fixture_config
from oslo_context import context
@@ -255,7 +254,6 @@ class BaseRealNotification(tests_base.BaseTestCase):
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
- eventlet.sleep(0)
self.assertNotEqual(self.srv.listeners, self.srv.pipeline_listeners)
self.srv.stop()
@@ -284,114 +282,45 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
self.assertIn(pipeline_poller_call,
self.srv.tg.add_timer.call_args_list)
- @mock.patch('ceilometer.publisher.test.TestPublisher')
- def test_notification_reloaded_pipeline(self, fake_publisher_cls):
- fake_publisher_cls.return_value = self.publisher
-
+ def test_notification_reloaded_pipeline(self):
pipeline_cfg_file = self.setup_pipeline(['instance'])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
- self.expected_samples = 1
self.srv.start()
- notifier = messaging.get_notifier(self.transport,
- "compute.vagrant-precise")
- notifier.info(context.RequestContext(), 'compute.instance.create.end',
- TEST_NOTICE_PAYLOAD)
-
- start = timeutils.utcnow()
- while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
- if (len(self.publisher.samples) >= self.expected_samples and
- len(self.publisher.events) >= self.expected_events):
- break
- eventlet.sleep(0)
-
- self.assertEqual(self.expected_samples, len(self.publisher.samples))
+ pipeline = self.srv.pipe_manager
- # Flush publisher samples to test reloading
- self.publisher.samples = []
# Modify the collection targets
updated_pipeline_cfg_file = self.setup_pipeline(['vcpus',
'disk.root.size'])
# Move/re-name the updated pipeline file to the original pipeline
# file path as recorded in oslo config
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
+ self.srv.refresh_pipeline()
- self.expected_samples = 2
- # Random sleep to let the pipeline poller complete the reloading
- eventlet.sleep(3)
- # Send message again to verify the reload works
- notifier = messaging.get_notifier(self.transport,
- "compute.vagrant-precise")
- notifier.info(context.RequestContext(), 'compute.instance.create.end',
- TEST_NOTICE_PAYLOAD)
-
- start = timeutils.utcnow()
- while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
- if (len(self.publisher.samples) >= self.expected_samples and
- len(self.publisher.events) >= self.expected_events):
- break
- eventlet.sleep(0)
-
- self.assertEqual(self.expected_samples, len(self.publisher.samples))
-
- (self.assertIn(sample.name, ['disk.root.size', 'vcpus'])
- for sample in self.publisher.samples)
-
- @mock.patch('ceilometer.publisher.test.TestPublisher')
- def test_notification_reloaded_event_pipeline(self, fake_publisher_cls):
- fake_publisher_cls.return_value = self.publisher
+ self.assertNotEqual(pipeline, self.srv.pipe_manager)
+ def test_notification_reloaded_event_pipeline(self):
ev_pipeline_cfg_file = self.setup_event_pipeline(
['compute.instance.create.start'])
self.CONF.set_override("event_pipeline_cfg_file", ev_pipeline_cfg_file)
self.CONF.set_override("store_events", True, group="notification")
- self.expected_events = 1
- self.srv.start()
- notifier = messaging.get_notifier(self.transport,
- "compute.vagrant-precise")
- notifier.info(context.RequestContext(),
- 'compute.instance.create.start',
- TEST_NOTICE_PAYLOAD)
-
- start = timeutils.utcnow()
- while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
- if len(self.publisher.events) >= self.expected_events:
- break
- eventlet.sleep(0)
+ self.srv.start()
- self.assertEqual(self.expected_events, len(self.publisher.events))
+ pipeline = self.srv.event_pipe_manager
- # Flush publisher events to test reloading
- self.publisher.events = []
# Modify the collection targets
updated_ev_pipeline_cfg_file = self.setup_event_pipeline(
['compute.instance.*'])
+
# Move/re-name the updated pipeline file to the original pipeline
# file path as recorded in oslo config
shutil.move(updated_ev_pipeline_cfg_file, ev_pipeline_cfg_file)
+ self.srv.refresh_pipeline()
- self.expected_events = 1
- # Random sleep to let the pipeline poller complete the reloading
- eventlet.sleep(3)
- # Send message again to verify the reload works
- notifier = messaging.get_notifier(self.transport,
- "compute.vagrant-precise")
- notifier.info(context.RequestContext(), 'compute.instance.create.end',
- TEST_NOTICE_PAYLOAD)
-
- start = timeutils.utcnow()
- while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
- if len(self.publisher.events) >= self.expected_events:
- break
- eventlet.sleep(0)
-
- self.assertEqual(self.expected_events, len(self.publisher.events))
-
- self.assertEqual(self.publisher.events[0].event_type,
- 'compute.instance.create.end')
+ self.assertNotEqual(pipeline, self.srv.pipe_manager)
class TestRealNotification(BaseRealNotification):
@@ -417,7 +346,6 @@ class TestRealNotification(BaseRealNotification):
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.publisher.events) >= self.expected_events:
break
- eventlet.sleep(0)
self.srv.stop()
self.assertEqual(self.expected_events, len(self.publisher.events))
@@ -582,7 +510,6 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
if (len(self.publisher.samples + self.publisher2.samples) >=
self.expected_samples):
break
- eventlet.sleep(0)
self.srv.stop()
self.srv2.stop()
diff --git a/ceilometer/tests/unit/agent/test_manager.py b/ceilometer/tests/unit/agent/test_manager.py
index eba5baa0..505def76 100644
--- a/ceilometer/tests/unit/agent/test_manager.py
+++ b/ceilometer/tests/unit/agent/test_manager.py
@@ -12,18 +12,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-"""Tests for ceilometer/central/manager.py
-"""
+"""Tests for ceilometer agent manager"""
import shutil
-import eventlet
from keystoneclient import exceptions as ks_exceptions
import mock
from novaclient import client as novaclient
from oslo_service import service as os_service
from oslo_utils import fileutils
-from oslo_utils import timeutils
from oslotest import base
from oslotest import mockpatch
import requests
@@ -408,11 +405,9 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start()
- start = timeutils.utcnow()
- while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
- if len(self.notified_samples) >= expected_samples:
- break
- eventlet.sleep(0)
+ # Manually executes callbacks
+ for timer in self.mgr.pollster_timers:
+ timer.f(*timer.args, **timer.kw)
samples = self.notified_samples
self.assertEqual(expected_samples, len(samples))
@@ -442,12 +437,6 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start()
- expected_samples = 1
- start = timeutils.utcnow()
- while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
- if len(self.notified_samples) >= expected_samples:
- break
- eventlet.sleep(0)
# we only got the old name of meters
for sample in self.notified_samples:
@@ -475,20 +464,10 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
# file path as recorded in oslo config
shutil.move(updated_pipeline_cfg_file, pipeline_cfg_file)
- # Random sleep to let the pipeline poller complete the reloading
- eventlet.sleep(3)
-
# Flush notified samples to test only new, nothing latent on
# fake message bus.
self.notified_samples = []
- expected_samples = 1
- start = timeutils.utcnow()
- while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
- if len(self.notified_samples) >= expected_samples:
- break
- eventlet.sleep(0)
-
# we only got the new name of meters
for sample in self.notified_samples:
self.assertEqual('testanother', sample['counter_name'])
diff --git a/ceilometer/tests/unit/publisher/test_messaging_publisher.py b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
index 3bd26487..10b15e2d 100644
--- a/ceilometer/tests/unit/publisher/test_messaging_publisher.py
+++ b/ceilometer/tests/unit/publisher/test_messaging_publisher.py
@@ -17,7 +17,6 @@
import datetime
import uuid
-import eventlet
import mock
from oslo_config import fixture as fixture_config
from oslo_context import context
@@ -116,7 +115,6 @@ class RpcOnlyPublisherTest(BasePublisherTestCase):
collector.stop())
collector.start()
- eventlet.sleep()
publisher.publish_samples(context.RequestContext(),
self.test_sample_data)
collector.wait()
@@ -219,32 +217,6 @@ class TestPublisher(testscenarios.testcase.WithScenarios,
class TestPublisherPolicy(TestPublisher):
- def test_published_concurrency(self):
- """Test concurrent access to the local queue of the rpc publisher."""
-
- publisher = self.publisher_cls(
- netutils.urlsplit('%s://' % self.protocol))
-
- with mock.patch.object(publisher, '_send') as fake_send:
- def fake_send_wait(ctxt, topic, meters):
- fake_send.side_effect = mock.Mock()
- # Sleep to simulate concurrency and allow other threads to work
- eventlet.sleep(0)
-
- fake_send.side_effect = fake_send_wait
-
- job1 = eventlet.spawn(getattr(publisher, self.pub_func),
- mock.MagicMock(), self.test_data)
- job2 = eventlet.spawn(getattr(publisher, self.pub_func),
- mock.MagicMock(), self.test_data)
-
- job1.wait()
- job2.wait()
-
- self.assertEqual('default', publisher.policy)
- self.assertEqual(2, len(fake_send.mock_calls))
- self.assertEqual(0, len(publisher.local_queue))
-
@mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_no_policy(self, mylog):
publisher = self.publisher_cls(
diff --git a/requirements.txt b/requirements.txt
index d0c83bd1..374779f1 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -3,7 +3,6 @@
# process, which may cause wedges in the gate later.
retrying!=1.3.0,>=1.2.3 # Apache-2.0
-eventlet>=0.17.4
jsonpath-rw-ext>=0.1.9
jsonschema!=2.5.0,<3.0.0,>=2.0.0
kafka-python>=0.9.2 # Apache-2.0
diff --git a/setup.cfg b/setup.cfg
index 95553013..04682287 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -240,13 +240,13 @@ ceilometer.event.trait_plugin =
console_scripts =
ceilometer-api = ceilometer.cmd.api:main
- ceilometer-polling = ceilometer.cmd.eventlet.polling:main
- ceilometer-agent-notification = ceilometer.cmd.eventlet.agent_notification:main
- ceilometer-send-sample = ceilometer.cmd.eventlet.sample:send_sample
- ceilometer-dbsync = ceilometer.cmd.eventlet.storage:dbsync
- ceilometer-expirer = ceilometer.cmd.eventlet.storage:expirer
+ ceilometer-polling = ceilometer.cmd.polling:main
+ ceilometer-agent-notification = ceilometer.cmd.agent_notification:main
+ ceilometer-send-sample = ceilometer.cmd.sample:send_sample
+ ceilometer-dbsync = ceilometer.cmd.storage:dbsync
+ ceilometer-expirer = ceilometer.cmd.storage:expirer
ceilometer-rootwrap = oslo_rootwrap.cmd:main
- ceilometer-collector = ceilometer.cmd.eventlet.collector:main
+ ceilometer-collector = ceilometer.cmd.collector:main
ceilometer.dispatcher.meter =
database = ceilometer.dispatcher.database:DatabaseDispatcher
diff --git a/tox.ini b/tox.ini
index 608940b9..e5f19523 100644
--- a/tox.ini
+++ b/tox.ini
@@ -9,7 +9,6 @@ deps = -r{toxinidir}/requirements.txt
install_command = pip install -U {opts} {packages}
usedevelop = True
setenv = VIRTUAL_ENV={envdir}
- EVENTLET_NO_GREENDNS=yes
OS_TEST_PATH=ceilometer/tests/unit
passenv = OS_TEST_TIMEOUT OS_STDOUT_CAPTURE OS_STDERR_CAPTURE OS_LOG_CAPTURE
commands =
@@ -44,7 +43,6 @@ commands =
[testenv:functional]
setenv = VIRTUAL_ENV={envdir}
- EVENTLET_NO_GREENDNS=yes
OS_TEST_PATH=ceilometer/tests/functional/
passenv = CEILOMETER_*
commands =
@@ -52,7 +50,6 @@ commands =
[testenv:py34-functional]
setenv = VIRTUAL_ENV={envdir}
- EVENTLET_NO_GREENDNS=yes
OS_TEST_PATH=ceilometer/tests/functional/
basepython = python3.4
passenv = CEILOMETER_*
@@ -61,7 +58,6 @@ commands =
[testenv:integration]
setenv = VIRTUAL_ENV={envdir}
- EVENTLET_NO_GREENDNS=yes
OS_TEST_PATH=./ceilometer/tests/integration
OS_TEST_TIMEOUT=2400
GABBI_LIVE_FAIL_IF_NO_TEST=1