summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHervé Beraud <hberaud@redhat.com>2020-01-17 16:24:29 +0100
committerHervé Beraud <hberaud@redhat.com>2020-02-18 15:49:27 +0100
commitfed48aea44d2d7683516cf0e9f427cd1113c2694 (patch)
tree77fb543b082a1f5719e79819e89a65efe49f808f
parent3359c520d31342efc8a8008b865b10fb84003fc1 (diff)
downloadoslo-messaging-fed48aea44d2d7683516cf0e9f427cd1113c2694.tar.gz
Remove the deprecated blocking executor
The blocking executor has been deprecated in Pike and marked for removal in Rocky, but some user like Mistral asked us to wait before. We decided to remove this executor for Train or next cycle, now we are in the Ussuri and after some researchs on usage I think we can go ahead. This patch drop the deprecation warnings, related unit tests and set the server with the threading executor is the default executor. Change-Id: If07bab61ee2b148658b88be98b12f8539f274efe Closes-Bug: #1715141
-rw-r--r--oslo_messaging/_utils.py14
-rw-r--r--oslo_messaging/notify/listener.py8
-rw-r--r--oslo_messaging/rpc/server.py4
-rw-r--r--oslo_messaging/server.py33
-rw-r--r--oslo_messaging/tests/notify/test_listener.py12
-rw-r--r--oslo_messaging/tests/rpc/test_server.py62
-rw-r--r--releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml8
-rw-r--r--setup.cfg1
8 files changed, 84 insertions, 58 deletions
diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py
index 362c4f2..969bdbb 100644
--- a/oslo_messaging/_utils.py
+++ b/oslo_messaging/_utils.py
@@ -13,6 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
+import logging
+
+from oslo_utils import eventletutils
+
+LOG = logging.getLogger(__name__)
+
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
@@ -59,3 +65,11 @@ class DummyLock(object):
def __exit__(self, type, value, traceback):
self.release()
+
+
+def get_executor_with_context():
+ if eventletutils.is_monkey_patched('thread'):
+ LOG.debug("Threading is patched, using an eventlet executor")
+ return 'eventlet'
+ LOG.debug("Using a threading executor")
+ return 'threading'
diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py
index ae6ac51..de9a26a 100644
--- a/oslo_messaging/notify/listener.py
+++ b/oslo_messaging/notify/listener.py
@@ -143,7 +143,7 @@ LOG = logging.getLogger(__name__)
class NotificationServerBase(msg_server.MessageHandlingServer):
- def __init__(self, transport, targets, dispatcher, executor='blocking',
+ def __init__(self, transport, targets, dispatcher, executor=None,
allow_requeue=True, pool=None, batch_size=1,
batch_timeout=None):
super(NotificationServerBase, self).__init__(transport, dispatcher,
@@ -167,7 +167,7 @@ class NotificationServerBase(msg_server.MessageHandlingServer):
class NotificationServer(NotificationServerBase):
- def __init__(self, transport, targets, dispatcher, executor='blocking',
+ def __init__(self, transport, targets, dispatcher, executor=None,
allow_requeue=True, pool=None):
if not isinstance(transport, msg_transport.NotificationTransport):
LOG.warning("Using RPC transport for notifications. Please use "
@@ -216,7 +216,7 @@ class BatchNotificationServer(NotificationServerBase):
def get_notification_listener(transport, targets, endpoints,
- executor='blocking', serializer=None,
+ executor=None, serializer=None,
allow_requeue=False, pool=None):
"""Construct a notification listener
@@ -250,7 +250,7 @@ def get_notification_listener(transport, targets, endpoints,
def get_batch_notification_listener(transport, targets, endpoints,
- executor='blocking', serializer=None,
+ executor=None, serializer=None,
allow_requeue=False, pool=None,
batch_size=None, batch_timeout=None):
"""Construct a batch notification listener
diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py
index d981b88..03517c3 100644
--- a/oslo_messaging/rpc/server.py
+++ b/oslo_messaging/rpc/server.py
@@ -138,7 +138,7 @@ LOG = logging.getLogger(__name__)
class RPCServer(msg_server.MessageHandlingServer):
- def __init__(self, transport, target, dispatcher, executor='blocking'):
+ def __init__(self, transport, target, dispatcher, executor=None):
super(RPCServer, self).__init__(transport, dispatcher, executor)
if not isinstance(transport, msg_transport.RPCTransport):
LOG.warning("Using notification transport for RPC. Please use "
@@ -200,7 +200,7 @@ class RPCServer(msg_server.MessageHandlingServer):
def get_rpc_server(transport, target, endpoints,
- executor='blocking', serializer=None, access_policy=None):
+ executor=None, serializer=None, access_policy=None):
"""Construct an RPC server.
:param transport: the messaging transport
diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py
index 983ad72..4df1512 100644
--- a/oslo_messaging/server.py
+++ b/oslo_messaging/server.py
@@ -23,7 +23,6 @@ import logging
import threading
import traceback
-import debtcollector
from oslo_config import cfg
from oslo_service import service
from oslo_utils import eventletutils
@@ -32,6 +31,7 @@ import six
from stevedore import driver
from oslo_messaging._drivers import base as driver_base
+from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
__all__ = [
@@ -306,16 +306,17 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
new tasks.
"""
- def __init__(self, transport, dispatcher, executor='blocking'):
+ def __init__(self, transport, dispatcher, executor=None):
"""Construct a message handling server.
The dispatcher parameter is a DispatcherBase instance which is used
for routing request to endpoint for processing.
The executor parameter controls how incoming messages will be received
- and dispatched. By default, the most simple executor is used - the
- blocking executor. It handles only one message at once. It's
- recommended to use threading or eventlet.
+ and dispatched. Executor is automatically detected from
+ execution environment.
+ It handles many message in parallel. If your application need
+ asynchronism then you need to consider to use the eventlet executor.
:param transport: the messaging transport
:type transport: Transport
@@ -326,19 +327,20 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
'eventlet' and 'threading'
:type executor: str
"""
+ if executor and executor not in ("threading", "eventlet"):
+ raise ExecutorLoadFailure(
+ executor,
+ "Executor should be None or 'eventlet' and 'threading'")
+ if not executor:
+ executor = utils.get_executor_with_context()
+
self.conf = transport.conf
self.conf.register_opts(_pool_opts)
self.transport = transport
self.dispatcher = dispatcher
self.executor_type = executor
- if self.executor_type == 'blocking':
- debtcollector.deprecate(
- 'blocking executor is deprecated. Executor default will be '
- 'removed. Use explicitly threading or eventlet instead',
- version="pike", removal_version="rocky",
- category=FutureWarning)
- elif self.executor_type == "eventlet":
+ if self.executor_type == "eventlet":
eventletutils.warn_eventlet_not_patched(
expected_patched_modules=['thread'],
what="the 'oslo.messaging eventlet executor'")
@@ -403,10 +405,9 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
executor_opts = {}
- if self.executor_type in ("threading", "eventlet"):
- executor_opts["max_workers"] = (
- override_pool_size or self.conf.executor_thread_pool_size
- )
+ executor_opts["max_workers"] = (
+ override_pool_size or self.conf.executor_thread_pool_size
+ )
self._work_executor = self._executor_cls(**executor_opts)
try:
diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py
index 44a48e0..1125b17 100644
--- a/oslo_messaging/tests/notify/test_listener.py
+++ b/oslo_messaging/tests/notify/test_listener.py
@@ -135,27 +135,21 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={}))
- @mock.patch('debtcollector.deprecate')
- def test_constructor(self, deprecate):
+ def test_constructor(self):
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo')
endpoints = [object()]
listener = oslo_messaging.get_notification_listener(
- transport, [target], endpoints)
+ transport, [target], endpoints, executor='threading')
self.assertIs(listener.conf, self.conf)
self.assertIs(listener.transport, transport)
self.assertIsInstance(listener.dispatcher,
dispatcher.NotificationDispatcher)
self.assertIs(listener.dispatcher.endpoints, endpoints)
- self.assertEqual('blocking', listener.executor_type)
- deprecate.assert_called_once_with(
- 'blocking executor is deprecated. Executor default will be '
- 'removed. Use explicitly threading or eventlet instead',
- removal_version='rocky', version='pike',
- category=FutureWarning)
+ self.assertEqual('threading', listener.executor_type)
def test_no_target_topic(self):
transport = msg_notifier.get_notification_transport(
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index b4ec519..c993a87 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -14,7 +14,6 @@
# under the License.
import threading
-import warnings
import eventlet
import fixtures
@@ -120,51 +119,62 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
new_value={}))
- @mock.patch('warnings.warn')
- def test_constructor(self, warn):
+ def test_constructor(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
access_policy = dispatcher.DefaultRPCAccessPolicy
- warnings.simplefilter("always", FutureWarning)
server = oslo_messaging.get_rpc_server(transport,
target,
endpoints,
serializer=serializer,
- access_policy=access_policy)
+ access_policy=access_policy,
+ executor='threading')
self.assertIs(server.conf, self.conf)
self.assertIs(server.transport, transport)
self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
self.assertIs(server.dispatcher.endpoints, endpoints)
self.assertIs(server.dispatcher.serializer, serializer)
- self.assertEqual('blocking', server.executor_type)
- self.assertEqual([
- mock.call("blocking executor is deprecated. Executor default will "
- "be removed. Use explicitly threading or eventlet "
- "instead in version 'pike' and will be removed in "
- "version 'rocky'",
- category=FutureWarning, stacklevel=3)
- ], warn.mock_calls)
-
- @mock.patch('warnings.warn')
- def test_constructor_without_explicit_RPCAccessPolicy(self, warn):
+ self.assertEqual('threading', server.executor_type)
+
+ def test_constructor_with_eventlet_executor(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
+ access_policy = dispatcher.DefaultRPCAccessPolicy
- warnings.simplefilter("always", FutureWarning)
- oslo_messaging.get_rpc_server(transport, target,
- endpoints, serializer=serializer)
- self.assertEqual([
- mock.call("blocking executor is deprecated. Executor default will "
- "be removed. Use explicitly threading or eventlet "
- "instead in version 'pike' and will be removed in "
- "version 'rocky'",
- category=FutureWarning, stacklevel=3)
- ], warn.mock_calls)
+ server = oslo_messaging.get_rpc_server(transport,
+ target,
+ endpoints,
+ serializer=serializer,
+ access_policy=access_policy,
+ executor='eventlet')
+ self.assertIs(server.conf, self.conf)
+ self.assertIs(server.transport, transport)
+ self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher)
+ self.assertIs(server.dispatcher.endpoints, endpoints)
+ self.assertIs(server.dispatcher.serializer, serializer)
+ self.assertEqual('eventlet', server.executor_type)
+
+ def test_constructor_with_unrecognized_executor(self):
+ transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
+ target = oslo_messaging.Target(topic='foo', server='bar')
+ endpoints = [object()]
+ serializer = object()
+ access_policy = dispatcher.DefaultRPCAccessPolicy
+
+ self.assertRaises(
+ server_module.ExecutorLoadFailure,
+ oslo_messaging.get_rpc_server,
+ transport=transport,
+ target=target,
+ endpoints=endpoints,
+ serializer=serializer,
+ access_policy=access_policy,
+ executor='boom')
def test_server_wait_method(self):
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
diff --git a/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml b/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml
new file mode 100644
index 0000000..878c5fb
--- /dev/null
+++ b/releasenotes/notes/blocking-executor-support-dropped-a3bc74c6825863f0.yaml
@@ -0,0 +1,8 @@
+---
+upgrade:
+ - |
+ The blocking executor has been deprecated for removal in Rocky and support
+ is now dropped in Ussuri. Its usage was never recommended for applications,
+ and it has no test coverage.
+ Applications should choose the appropriate threading model that maps to
+ their usage instead.
diff --git a/setup.cfg b/setup.cfg
index ecacb87..116ebff 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -51,7 +51,6 @@ oslo.messaging.drivers =
fake = oslo_messaging._drivers.impl_fake:FakeDriver
oslo.messaging.executors =
- blocking = futurist:SynchronousExecutor
eventlet = futurist:GreenThreadPoolExecutor
threading = futurist:ThreadPoolExecutor