summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-12-10 11:15:48 +0000
committerGerrit Code Review <review@openstack.org>2015-12-10 11:15:48 +0000
commit4b6144a3db4b2d10b0b4fa7e437ed264e7460df1 (patch)
tree4a269ca96ce1a941c17a8bb76503650d1b265aa4
parent8e792a5f703628235710b4ee06d599b854e46111 (diff)
parentbdf287e847024368e20f5f806380e97070c9561c (diff)
downloadoslo-messaging-4b6144a3db4b2d10b0b4fa7e437ed264e7460df1.tar.gz
Merge "creates a dispatcher abstraction"
-rw-r--r--oslo_messaging/_utils.py51
-rw-r--r--oslo_messaging/dispatcher.py105
-rw-r--r--oslo_messaging/notify/dispatcher.py6
-rw-r--r--oslo_messaging/rpc/dispatcher.py5
-rw-r--r--oslo_messaging/tests/executors/test_executor.py7
5 files changed, 114 insertions, 60 deletions
diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py
index cec94bb..021fea2 100644
--- a/oslo_messaging/_utils.py
+++ b/oslo_messaging/_utils.py
@@ -46,57 +46,6 @@ def version_is_compatible(imp_version, version):
return True
-class DispatcherExecutorContext(object):
- """Dispatcher executor context helper
-
- A dispatcher can have work to do before and after the dispatch of the
- request in the main server thread while the dispatcher itself can be
- done in its own thread.
-
- The executor can use the helper like this:
-
- callback = dispatcher(incoming)
- callback.prepare()
- thread = MyWhateverThread()
- thread.on_done(callback.done)
- thread.run(callback.run)
-
- """
- def __init__(self, incoming, dispatch, executor_callback=None,
- post=None):
- self._result = None
- self._incoming = incoming
- self._dispatch = dispatch
- self._post = post
- self._executor_callback = executor_callback
-
- def run(self):
- """The incoming message dispath itself
-
- Can be run in an other thread/greenlet/corotine if the executor is
- able to do it.
- """
- try:
- self._result = self._dispatch(self._incoming,
- self._executor_callback)
- except Exception:
- msg = 'The dispatcher method must catches all exceptions'
- LOG.exception(msg)
- raise RuntimeError(msg)
-
- def done(self):
- """Callback after the incoming message have been dispathed
-
- Should be ran in the main executor thread/greenlet/corotine
- """
- # FIXME(sileht): this is not currently true, this works only because
- # the driver connection used for polling write on the wire only to
- # ack/requeue message, but what if one day, the driver do something
- # else
- if self._post is not None:
- self._post(self._incoming, self._result)
-
-
def fetch_current_thread_functor():
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
# or addressed we have to use complicated workaround to get a object
diff --git a/oslo_messaging/dispatcher.py b/oslo_messaging/dispatcher.py
new file mode 100644
index 0000000..5cdd147
--- /dev/null
+++ b/oslo_messaging/dispatcher.py
@@ -0,0 +1,105 @@
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+
+import six
+
+__all__ = [
+ "DispatcherBase",
+ "DispatcherExecutorContext"
+]
+
+LOG = logging.getLogger(__name__)
+
+
+class DispatcherExecutorContext(object):
+ """Dispatcher executor context helper
+
+ A dispatcher can have work to do before and after the dispatch of the
+ request in the main server thread while the dispatcher itself can be
+ done in its own thread.
+
+ The executor can use the helper like this:
+
+ callback = dispatcher(incoming)
+ callback.prepare()
+ thread = MyWhateverThread()
+ thread.on_done(callback.done)
+ thread.run(callback.run)
+
+ """
+ def __init__(self, incoming, dispatch, executor_callback=None,
+ post=None):
+ self._result = None
+ self._incoming = incoming
+ self._dispatch = dispatch
+ self._post = post
+ self._executor_callback = executor_callback
+
+ def run(self):
+ """The incoming message dispath itself
+
+ Can be run in an other thread/greenlet/corotine if the executor is
+ able to do it.
+ """
+ try:
+ self._result = self._dispatch(self._incoming,
+ self._executor_callback)
+ except Exception:
+ msg = 'The dispatcher method must catches all exceptions'
+ LOG.exception(msg)
+ raise RuntimeError(msg)
+
+ def done(self):
+ """Callback after the incoming message have been dispathed
+
+ Should be ran in the main executor thread/greenlet/corotine
+ """
+ # FIXME(sileht): this is not currently true, this works only because
+ # the driver connection used for polling write on the wire only to
+ # ack/requeue message, but what if one day, the driver do something
+ # else
+ if self._post is not None:
+ self._post(self._incoming, self._result)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class DispatcherBase(object):
+ "Base class for dispatcher"
+
+ @abc.abstractmethod
+ def _listen(self, transport):
+ """Initiate the driver Listener
+
+ Usualy the driver Listener is start with the transport helper methods:
+
+ * transport._listen()
+ * transport._listen_for_notifications()
+
+ :param transport: the transport object
+ :type transport: oslo_messaging.transport.Transport
+ :returns: a driver Listener object
+ :rtype: oslo_messaging._drivers.base.Listener
+ """
+
+ @abc.abstractmethod
+ def __call__(self, incoming, executor_callback=None):
+ """Called by the executor to get the DispatcherExecutorContext
+
+ :param incoming: message or list of messages
+ :type incoming: oslo_messging._drivers.base.IncomingMessage
+ :returns: DispatcherExecutorContext
+ :rtype: DispatcherExecutorContext
+ """
diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py
index 46d5303..33aeea8 100644
--- a/oslo_messaging/notify/dispatcher.py
+++ b/oslo_messaging/notify/dispatcher.py
@@ -18,7 +18,7 @@ import itertools
import logging
import sys
-from oslo_messaging import _utils as utils
+from oslo_messaging import dispatcher
from oslo_messaging import localcontext
from oslo_messaging import serializer as msg_serializer
@@ -33,7 +33,7 @@ class NotificationResult(object):
REQUEUE = 'requeue'
-class NotificationDispatcher(object):
+class NotificationDispatcher(dispatcher.DispatcherBase):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
@@ -69,7 +69,7 @@ class NotificationDispatcher(object):
pool=self.pool)
def __call__(self, incoming, executor_callback=None):
- return utils.DispatcherExecutorContext(
+ return dispatcher.DispatcherExecutorContext(
incoming, self._dispatch_and_handle_error,
executor_callback=executor_callback,
post=self._post_dispatch)
diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py
index 87dd409..5b6d087 100644
--- a/oslo_messaging/rpc/dispatcher.py
+++ b/oslo_messaging/rpc/dispatcher.py
@@ -31,6 +31,7 @@ import six
from oslo_messaging._i18n import _LE
from oslo_messaging import _utils as utils
+from oslo_messaging import dispatcher
from oslo_messaging import localcontext
from oslo_messaging import serializer as msg_serializer
from oslo_messaging import server as msg_server
@@ -75,7 +76,7 @@ class UnsupportedVersion(RPCDispatcherError):
self.method = method
-class RPCDispatcher(object):
+class RPCDispatcher(dispatcher.DispatcherBase):
"""A message dispatcher which understands RPC messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
@@ -131,7 +132,7 @@ class RPCDispatcher(object):
def __call__(self, incoming, executor_callback=None):
incoming.acknowledge()
- return utils.DispatcherExecutorContext(
+ return dispatcher.DispatcherExecutorContext(
incoming, self._dispatch_and_reply,
executor_callback=executor_callback)
diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py
index 1e175fd..cd338a4 100644
--- a/oslo_messaging/tests/executors/test_executor.py
+++ b/oslo_messaging/tests/executors/test_executor.py
@@ -44,7 +44,7 @@ try:
except ImportError:
impl_eventlet = None
from oslo_messaging._executors import impl_thread
-from oslo_messaging import _utils as utils
+from oslo_messaging import dispatcher as dispatcher_base
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@@ -151,9 +151,8 @@ class TestExecutor(test_utils.BaseTestCase):
return result
def __call__(self, incoming, executor_callback=None):
- return utils.DispatcherExecutorContext(incoming,
- self.callback,
- executor_callback)
+ return dispatcher_base.DispatcherExecutorContext(
+ incoming, self.callback, executor_callback)
return Dispatcher(endpoint), endpoint, event, run_executor