summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_messaging/_utils.py14
-rw-r--r--oslo_messaging/notify/listener.py8
-rw-r--r--oslo_messaging/rpc/server.py4
-rw-r--r--oslo_messaging/server.py33
-rw-r--r--oslo_messaging/tests/notify/test_listener.py12
-rw-r--r--oslo_messaging/tests/rpc/test_server.py62
-rw-r--r--releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml8
-rw-r--r--releasenotes/notes/drop-python27-support-5ef2f365d8930483.yaml2
-rw-r--r--setup.cfg1
-rw-r--r--setup.py1
10 files changed, 85 insertions, 60 deletions
diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py
index 362c4f2..969bdbb 100644
--- a/oslo_messaging/_utils.py
+++ b/oslo_messaging/_utils.py
@@ -13,6 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
+import logging
+
+from oslo_utils import eventletutils
+
+LOG = logging.getLogger(__name__)
+
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
@@ -59,3 +65,11 @@ class DummyLock(object):
def __exit__(self, type, value, traceback):
self.release()
+
+
+def get_executor_with_context():
+ if eventletutils.is_monkey_patched('thread'):
+ LOG.debug("Threading is patched, using an eventlet executor")
+ return 'eventlet'
+ LOG.debug("Using a threading executor")
+ return 'threading'
diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py
index ae6ac51..de9a26a 100644
--- a/oslo_messaging/notify/listener.py
+++ b/oslo_messaging/notify/listener.py
@@ -143,7 +143,7 @@ LOG = logging.getLogger(__name__)
class NotificationServerBase(msg_server.MessageHandlingServer):
- def __init__(self, transport, targets, dispatcher, executor='blocking',
+ def __init__(self, transport, targets, dispatcher, executor=None,
allow_requeue=True, pool=None, batch_size=1,
batch_timeout=None):
super(NotificationServerBase, self).__init__(transport, dispatcher,
@@ -167,7 +167,7 @@ class NotificationServerBase(msg_server.MessageHandlingServer):
class NotificationServer(NotificationServerBase):
- def __init__(self, transport, targets, dispatcher, executor='blocking',
+ def __init__(self, transport, targets, dispatcher, executor=None,
allow_requeue=True, pool=None):
if not isinstance(transport, msg_transport.NotificationTransport):
LOG.warning("Using RPC transport for notifications. Please use "
@@ -216,7 +216,7 @@ class BatchNotificationServer(NotificationServerBase):
def get_notification_listener(transport, targets, endpoints,
- executor='blocking', serializer=None,
+ executor=None, serializer=None,
allow_requeue=False, pool=None):
"""Construct a notification listener
@@ -250,7 +250,7 @@ def get_notification_listener(transport, targets, endpoints,
def get_batch_notification_listener(transport, targets, endpoints,
- executor='blocking', serializer=None,
+ executor=None, serializer=None,
allow_requeue=False, pool=None,
batch_size=None, batch_timeout=None):
"""Construct a batch notification listener
diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py
index d981b88..03517c3 100644
--- a/oslo_messaging/rpc/server.py
+++ b/oslo_messaging/rpc/server.py
@@ -138,7 +138,7 @@ LOG = logging.getLogger(__name__)
class RPCServer(msg_server.MessageHandlingServer):
- def __init__(self, transport, target, dispatcher, executor='blocking'):
+ def __init__(self, transport, target, dispatcher, executor=None):
super(RPCServer, self).__init__(transport, dispatcher, executor)
if not isinstance(transport, msg_transport.RPCTransport):
LOG.warning("Using notification transport for RPC. Please use "
@@ -200,7 +200,7 @@ class RPCServer(msg_server.MessageHandlingServer):
def get_rpc_server(transport, target, endpoints,
- executor='blocking', serializer=None, access_policy=None):
+ executor=None, serializer=None, access_policy=None):
"""Construct an RPC server.
:param transport: the messaging transport
diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py
index 983ad72..4df1512 100644
--- a/oslo_messaging/server.py
+++ b/oslo_messaging/server.py
@@ -23,7 +23,6 @@ import logging
import threading
import traceback
-import debtcollector
from oslo_config import cfg
from oslo_service import service
from oslo_utils import eventletutils
@@ -32,6 +31,7 @@ import six
from stevedore import driver
from oslo_messaging._drivers import base as driver_base
+from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
__all__ = [
@@ -306,16 +306,17 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
new tasks.
"""
- def __init__(self, transport, dispatcher, executor='blocking'):
+ def __init__(self, transport, dispatcher, executor=None):
"""Construct a message handling server.
The dispatcher parameter is a DispatcherBase instance which is used
for routing request to endpoint for processing.
The executor parameter controls how incoming messages will be received
- and dispatched. By default, the most simple executor is used - the
- blocking executor. It handles only one message at once. It's
- recommended to use threading or eventlet.
+ and dispatched. Executor is automatically detected from
+ execution environment.
+ It handles many message in parallel. If your application need
+ asynchronism then you need to consider to use the eventlet executor.
:param transport: the messaging transport
:type transport: Transport
@@ -326,19 +327,20 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
'eventlet' and 'threading'
:type executor: str
"""
+ if executor and executor not in ("threading", "eventlet"):
+ raise ExecutorLoadFailure(
+ executor,
+ "Executor should be None or 'eventlet' and 'threading'")
+ if not executor:
+ executor = utils.get_executor_with_context()
+
self.conf = transport.conf
self.conf.register_opts(_pool_opts)
self.transport = transport
self.dispatcher = dispatcher
self.executor_type = executor
- if self.executor_type == 'blocking':
- debtcollector.deprecate(
- 'blocking executor is deprecated. Executor default will be '
- 'removed. Use explicitly threading or eventlet instead',
- version="pike", removal_version="rocky",
- category=FutureWarning)
- elif self.executor_type == "eventlet":
+ if self.executor_type == "eventlet":
eventletutils.warn_eventlet_not_patched(
expected_patched_modules=['thread'],
what="the 'oslo.messaging eventlet executor'")
@@ -403,10 +405,9 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
executor_opts = {}
- if self.executor_type in ("threading", "eventlet"):
- executor_opts["max_workers"] = (
- override_pool_size or self.conf.executor_thread_pool_size
- )
+ executor_opts["max_workers"] = (
+ override_pool_size or self.conf.executor_thread_pool_size
+ )
self._work_executor = self._executor_cls(**executor_opts)
try:
diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py
index 44a48e0..1125b17 100644
--- a/oslo_messaging/tests/notify/test_listener.py
+++ b/oslo_messaging/tests/notify/test_listener.py
@@ -135,27 +135,21 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={}))
- @mock.patch('debtcollector.deprecate')
- def test_constructor(self, deprecate):
+ def test_constructor(self):
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo')
endpoints = [object()]
listener = oslo_messaging.get_notification_listener(
- transport, [target], endpoints)
+ transport, [target], endpoints, executor='threading')
self.assertIs(listener.conf, self.conf)
self.assertIs(listener.transport, transport)
self.assertIsInstance(listener.dispatcher,
dispatcher.NotificationDispatcher)
self.assertIs(listener.dispatcher.endpoints, endpoints)
- self.assertEqual('blocking', listener.executor_type)
- deprecate.assert_called_once_with(
- 'blocking executor is deprecated. Executor default will be '
- 'removed. Use explicitly threading or eventlet instead',
- removal_version='rocky', version='pike',
- category=FutureWarning)
+ self.assertEqual('threading', listener.executor_type)
def test_no_target_topic(self):
transport = msg_notifier.get_notification_transport(
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index b4ec519..c993a87 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -14,7 +14,6 @@
# under the License.
import threading
-import warnings
import eventlet
import fixtures
@@ -120,51 +119,62 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={}))
- @mock.patch('warnings.warn')
- def test_constructor(self, warn):
+ def test_constructor(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
access_policy = dispatcher.DefaultRPCAccessPolicy
- warnings.simplefilter("always", FutureWarning)
server = oslo_messaging.get_rpc_server(transport,
target,
endpoints,
serializer=serializer,
- access_policy=access_policy)
+ access_policy=access_policy,
+ executor='threading')
self.assertIs(server.conf, self.conf)
self.assertIs(server.transport, transport)
self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer)
- self.assertEqual('blocking', server.executor_type)
- self.assertEqual([
- mock.call("blocking executor is deprecated. Executor default will "
- "be removed. Use explicitly threading or eventlet "
- "instead in version 'pike' and will be removed in "
- "version 'rocky'",
- category=FutureWarning, stacklevel=3)
- ], warn.mock_calls)
-
- @mock.patch('warnings.warn')
- def test_constructor_without_explicit_RPCAccessPolicy(self, warn):
+ self.assertEqual('threading', server.executor_type)
+
+ def test_constructor_with_eventlet_executor(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
+ access_policy = dispatcher.DefaultRPCAccessPolicy
- warnings.simplefilter("always", FutureWarning)
- oslo_messaging.get_rpc_server(transport, target,
- endpoints, serializer=serializer)
- self.assertEqual([
- mock.call("blocking executor is deprecated. Executor default will "
- "be removed. Use explicitly threading or eventlet "
- "instead in version 'pike' and will be removed in "
- "version 'rocky'",
- category=FutureWarning, stacklevel=3)
- ], warn.mock_calls)
+ server = oslo_messaging.get_rpc_server(transport,
+ target,
+ endpoints,
+ serializer=serializer,
+ access_policy=access_policy,
+ executor='eventlet')
+ self.assertIs(server.conf, self.conf)
+ self.assertIs(server.transport, transport)
+ self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
+ self.assertIs(server.dispatcher.endpoints, endpoints)
+ self.assertIs(server.dispatcher.serializer, serializer)
+ self.assertEqual('eventlet', server.executor_type)
+
+ def test_constructor_with_unrecognized_executor(self):
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
+ target = oslo_messaging.Target(topic='foo', server='bar')
+ endpoints = [object()]
+ serializer = object()
+ access_policy = dispatcher.DefaultRPCAccessPolicy
+
+ self.assertRaises(
+ server_module.ExecutorLoadFailure,
+ oslo_messaging.get_rpc_server,
+ transport=transport,
+ target=target,
+ endpoints=endpoints,
+ serializer=serializer,
+ access_policy=access_policy,
+ executor='boom')
def test_server_wait_method(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
diff --git a/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml b/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml
new file mode 100644
index 0000000..878c5fb
--- /dev/null
+++ b/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml
@@ -0,0 +1,8 @@
+---
+upgrade:
+ - |
+ The blocking executor has been deprecated for removal in Rocky and support
+ is now dropped in Ussuri. Its usage was never recommended for applications,
+ and it has no test coverage.
+ Applications should choose the appropriate threading model that maps to
+ their usage instead.
diff --git a/releasenotes/notes/drop-python27-support-5ef2f365d8930483.yaml b/releasenotes/notes/drop-python27-support-5ef2f365d8930483.yaml
index c5195c0..5684dbe 100644
--- a/releasenotes/notes/drop-python27-support-5ef2f365d8930483.yaml
+++ b/releasenotes/notes/drop-python27-support-5ef2f365d8930483.yaml
@@ -1,5 +1,5 @@
---
upgrade:
- |
- Support for Python 2.7 has been dropped. The latest version of Python now
+ Support for Python 2.7 has been dropped. The minimum version of Python now
supported is Python 3.6.
diff --git a/setup.cfg b/setup.cfg
index ecacb87..116ebff 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -51,7 +51,6 @@ oslo.messaging.drivers =
fake = oslo_messaging._drivers.impl_fake:FakeDriver
oslo.messaging.executors =
- blocking = futurist:SynchronousExecutor
eventlet = futurist:GreenThreadPoolExecutor
threading = futurist:ThreadPoolExecutor
diff --git a/setup.py b/setup.py
index f63cc23..cd35c3c 100644
--- a/setup.py
+++ b/setup.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# THIS FILE IS MANAGED BY THE GLOBAL REQUIREMENTS REPO - DO NOT EDIT
import setuptools
setuptools.setup(