summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Giusti <kgiusti@gmail.com>2016-11-07 15:57:45 -0500
committerKenneth Giusti <kgiusti@gmail.com>2016-11-22 08:11:22 -0500
commitadf4cd9d058e6eb269aa3a6706b93cff42409206 (patch)
tree333616d5e71c06a4c9aac82f856e5fb1c2b2be0c
parent3f393e521c9b5b23dc7e23f620570a21955b680b (diff)
downloadoslo-messaging-adf4cd9d058e6eb269aa3a6706b93cff42409206.tar.gz
Document the transport backend driver interface
Add detailed documentation to the driver API to help driver developers create drivers that behave consistently. Specifically prescribes a set of operational characteristics that a driver must conform to in order to provide consistent behavior across different implementations. Change-Id: Icb251ee724f9a0ac4fede702a367910de4ba95e3
-rw-r--r--doc/source/driver-dev-guide.rst75
-rw-r--r--doc/source/index.rst1
-rw-r--r--oslo_messaging/_drivers/base.py433
3 files changed, 447 insertions, 62 deletions
diff --git a/doc/source/driver-dev-guide.rst b/doc/source/driver-dev-guide.rst
new file mode 100644
index 0000000..0062575
--- /dev/null
+++ b/doc/source/driver-dev-guide.rst
@@ -0,0 +1,75 @@
+---------------------------------------
+Guide for Transport Driver Implementors
+---------------------------------------
+
+.. currentmodule:: oslo_messaging
+
+.. automodule:: oslo_messaging._drivers.base
+
+============
+Introduction
+============
+
+This document is a *best practices* guide for the developer interested
+in creating a new transport driver for Oslo.Messaging. It should also
+be used by maintainers as a reference for proper driver behavior.
+This document will describe the driver interface and prescribe the
+expected behavior of any driver implemented to this interface.
+
+**Note well:** The API described in this document is internal to the
+oslo.messaging library and therefore **private**. Under no
+circumstances should this API be referenced by code external to the
+oslo.messaging library.
+
+================
+Driver Interface
+================
+
+The driver interface is defined by a set of abstract base classes. The
+developer creates a driver by defining concrete classes from these
+bases. The derived classes embody the logic that is specific for the
+messaging back-end that is to be supported.
+
+These base classes are defined in the *base.py* file in the *_drivers*
+subdirectory.
+
+===============
+IncomingMessage
+===============
+
+.. autoclass:: IncomingMessage
+ :members:
+
+==================
+RpcIncomingMessage
+==================
+
+.. autoclass:: RpcIncomingMessage
+ :members:
+
+========
+Listener
+========
+
+.. autoclass:: Listener
+ :members:
+
+=================
+PollStyleListener
+=================
+
+.. autoclass:: PollStyleListener
+ :members:
+
+==========
+BaseDriver
+==========
+
+.. autoclass:: BaseDriver
+ :members:
+
+
+
+
+
+
diff --git a/doc/source/index.rst b/doc/source/index.rst
index b497820..8125127 100644
--- a/doc/source/index.rst
+++ b/doc/source/index.rst
@@ -27,6 +27,7 @@ Contents
AMQP1.0
pika_driver
zmq_driver
+ driver-dev-guide
FAQ
contributing
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py
index 24d703f..f7c1ba4 100644
--- a/oslo_messaging/_drivers/base.py
+++ b/oslo_messaging/_drivers/base.py
@@ -1,4 +1,3 @@
-
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -38,8 +37,9 @@ base_opts = [
def batch_poll_helper(func):
"""Decorator to poll messages in batch
- This decorator helps driver that polls message one by one,
- to returns a list of message.
+ This decorator is used to add message batching support to a
+ :py:meth:`PollStyleListener.poll` implementation that only polls for a
+ single message per call.
"""
def wrapper(in_self, timeout=None, batch_size=1, batch_timeout=None):
incomings = []
@@ -78,70 +78,173 @@ class TransportDriverError(exceptions.MessagingException):
@six.add_metaclass(abc.ABCMeta)
class IncomingMessage(object):
+ """The IncomingMessage class represents a single message received from the
+ messaging backend. Instances of this class are passed to up a server's
+ messaging processing logic. The backend driver must provide a concrete
+ derivation of this class which provides the backend specific logic for its
+ public methods.
+
+ :param ctxt: Context metadata provided by sending application.
+ :type ctxt: dict
+ :param message: The message as provided by the sending application.
+ :type message: dict
+ """
def __init__(self, ctxt, message):
self.ctxt = ctxt
self.message = message
def acknowledge(self):
- """Acknowledge the message."""
+ """Called by the server to acknowledge receipt of the message. When
+ this is called the driver must notify the backend of the
+ acknowledgment. This call should block at least until the driver has
+ processed the acknowledgment request locally. It may unblock before the
+ acknowledgment state has been acted upon by the backend.
+
+ If the acknowledge operation fails this method must issue a log message
+ describing the reason for the failure.
+
+ :raises: Does not raise an exception
+ """
@abc.abstractmethod
def requeue(self):
- """Requeue the message."""
+ """Called by the server to return the message to the backend so it may
+ be made available for consumption by another server. This call should
+ block at least until the driver has processed the requeue request
+ locally. It may unblock before the backend makes the requeued message
+ available for consumption.
+
+ If the requeue operation fails this method must issue a log message
+ describing the reason for the failure.
+
+ Support for this method is _optional_. The
+ :py:meth:`BaseDriver.require_features` method should indicate whether
+ or not support for requeue is available.
+
+ :raises: Does not raise an exception
+ """
@six.add_metaclass(abc.ABCMeta)
class RpcIncomingMessage(IncomingMessage):
+ """The RpcIncomingMessage represents an RPC request message received from
+ the backend. This class must be used for RPC calls that return a value to
+ the caller.
+ """
@abc.abstractmethod
def reply(self, reply=None, failure=None):
- """Send a reply or failure back to the client."""
+ """Called by the server to send an RPC reply message or an exception
+ back to the calling client.
+
+ If an exception is passed via *failure* the driver must convert it to
+ a form that can be sent as a message and properly converted back to the
+ exception at the remote.
+
+ The driver must provide a way to determine the destination address for
+ the reply. For example the driver may use the *reply-to* field from the
+ corresponding incoming message. Often a driver will also need to set a
+ correlation identifier in the reply to help the remote route the reply
+ to the correct RPCClient.
+
+ The driver should provide an *at-most-once* delivery guarantee for
+ reply messages. This call should block at least until the reply message
+ has been handed off to the backend - there is no need to confirm that
+ the reply has been delivered.
+
+ If the reply operation fails this method must issue a log message
+ describing the reason for the failure.
+
+ See :py:meth:`BaseDriver.send` for details regarding how the received
+ reply is processed.
+
+ :param reply: reply message body
+ :type reply: dict
+ :param failure: an exception thrown by the RPC call
+ :type failure: Exception
+ :raises: Does not raise an exception
+ """
@six.add_metaclass(abc.ABCMeta)
class PollStyleListener(object):
+ """A PollStyleListener is used to transfer received messages to a server
+ for processing. A polling pattern is used to retrieve messages. A
+ PollStyleListener uses a separate thread to run the polling loop. A
+ :py:class:`PollStyleListenerAdapter` can be used to create a
+ :py:class:`Listener` from a PollStyleListener.
+
+ :param prefetch_size: The number of messages that should be pulled from the
+ backend per receive transaction. May not be honored by all backend
+ implementations.
+ :type prefetch_size: int
+ """
+
def __init__(self, prefetch_size=-1):
self.prefetch_size = prefetch_size
@abc.abstractmethod
def poll(self, timeout=None, batch_size=1, batch_timeout=None):
- """Blocking until 'batch_size' message is pending and return
- [IncomingMessage].
- Waits for first message. Then waits for next batch_size-1 messages
- during batch window defined by batch_timeout
- This method block current thread until message comes, stop() is
- executed by another thread or timemout is elapsed.
+ """poll is called by the server to retrieve incoming messages. It
+ blocks until 'batch_size' incoming messages are available, a timeout
+ occurs, or the poll is interrupted by a call to the :py:meth:`stop`
+ method.
+
+ If 'batch_size' is > 1 poll must block until 'batch_size' messages are
+ available or at least one message is available and batch_timeout
+ expires
+
+ :param timeout: Block up to 'timeout' seconds waiting for a message
+ :type timeout: float
+ :param batch_size: Block until this number of messages are received.
+ :type batch_size: int
+ :param batch_timeout: Time to wait in seconds for a full batch to
+ arrive. A timer is started when the first message in a batch is
+ received. If a full batch's worth of messages is not received when
+ the timer expires then :py:meth:`poll` returns all messages
+ received thus far.
+ :type batch_timeout: float
+ :raises: Does not raise an exception.
+ :return: A list of up to batch_size IncomingMessage objects.
"""
def stop(self):
- """Stop listener.
- Stop the listener message polling
+ """Stop the listener from polling for messages. This method must cause
+ the :py:meth:`poll` call to unblock and return whatever messages are
+ currently available. This method is called from a different thread
+ than the poller so it must be thread-safe.
"""
pass
def cleanup(self):
- """Cleanup listener.
- Close connection (socket) used by listener if any.
- As this is listener specific method, overwrite it in to derived class
- if cleanup of listener required.
+ """Cleanup all resources held by the listener. This method should block
+ until the cleanup is completed.
"""
pass
@six.add_metaclass(abc.ABCMeta)
class Listener(object):
+ """A Listener is used to transfer incoming messages from the driver to a
+ server for processing. A callback is used by the driver to transfer the
+ messages.
+
+ :param batch_size: desired number of messages passed to
+ single on_incoming_callback notification
+ :type batch_size: int
+ :param batch_timeout: defines how long should we wait in seconds for
+ batch_size messages if we already have some messages waiting for
+ processing
+ :type batch_timeout: float
+ :param prefetch_size: defines how many messages we want to prefetch
+ from the messaging backend in a single request. May not be honored by
+ all backend implementations.
+ :type prefetch_size: int
+ """
def __init__(self, batch_size, batch_timeout,
prefetch_size=-1):
- """Init Listener
-
- :param batch_size: desired number of messages passed to
- single on_incoming_callback notification
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_size: defines how many massages we want to prefetch
- from backend (depend on driver type) by single request
- """
+
self.on_incoming_callback = None
self.batch_timeout = batch_timeout
self.prefetch_size = prefetch_size
@@ -150,31 +253,34 @@ class Listener(object):
self.batch_size = batch_size
def start(self, on_incoming_callback):
- """Start listener.
- Start the listener message polling
-
- :param on_incoming_callback: callback function to be executed when
- listener received messages. Messages should be processed and
- acked/nacked by callback
+ """Start receiving messages. This should cause the driver to start
+ receiving messages from the backend. When message(s) arrive the driver
+ must invoke 'on_incoming_callback' passing it the received messages as
+ a list of IncomingMessages.
+
+ :param on_incoming_callback: callback function to be executed when
+ listener receives messages.
+ :type on_incoming_callback: func
"""
self.on_incoming_callback = on_incoming_callback
def stop(self):
- """Stop listener.
- Stop the listener message polling
+ """Stop receiving messages. The driver must no longer invoke
+ the callback.
"""
self.on_incoming_callback = None
@abc.abstractmethod
def cleanup(self):
- """Cleanup listener.
- Close connection (socket) used by listener if any.
- As this is listener specific method, overwrite it in to derived class
- if cleanup of listener required.
+ """Cleanup all resources held by the listener. This method should block
+ until the cleanup is completed.
"""
class PollStyleListenerAdapter(Listener):
+ """A Listener that uses a PollStyleListener for message transfer. A
+ dedicated thread is created to do message polling.
+ """
def __init__(self, poll_style_listener, batch_size, batch_timeout):
super(PollStyleListenerAdapter, self).__init__(
batch_size, batch_timeout, poll_style_listener.prefetch_size
@@ -185,13 +291,6 @@ class PollStyleListenerAdapter(Listener):
self._started = False
def start(self, on_incoming_callback):
- """Start listener.
- Start the listener message polling
-
- :param on_incoming_callback: callback function to be executed when
- listener received messages. Messages should be processed and
- acked/nacked by callback
- """
super(PollStyleListenerAdapter, self).start(on_incoming_callback)
self._started = True
self._listen_thread.start()
@@ -216,25 +315,33 @@ class PollStyleListenerAdapter(Listener):
self.on_incoming_callback(incoming)
def stop(self):
- """Stop listener.
- Stop the listener message polling
- """
self._started = False
self._poll_style_listener.stop()
self._listen_thread.join()
super(PollStyleListenerAdapter, self).stop()
def cleanup(self):
- """Cleanup listener.
- Close connection (socket) used by listener if any.
- As this is listener specific method, overwrite it in to derived class
- if cleanup of listener required.
- """
self._poll_style_listener.cleanup()
@six.add_metaclass(abc.ABCMeta)
class BaseDriver(object):
+ """Defines the backend driver interface. Each backend driver implementation
+ must provide a concrete derivation of this class implementing the backend
+ specific logic for its public methods.
+
+ :param conf: The configuration settings provided by the user.
+ :type conf: ConfigOpts
+ :param url: The network address of the messaging backend(s).
+ :type url: TransportURL
+ :param default_exchange: The exchange to use if no exchange is specified in
+ a Target.
+ :type default_exchange: str
+ :param allowed_remote_exmods: whitelist of those exception modules which
+ are permitted to be re-raised if an exception is returned in response
+ to an RPC call.
+ :type allowed_remote_exmods: list
+ """
prefetch_size = 0
def __init__(self, conf, url,
@@ -245,30 +352,232 @@ class BaseDriver(object):
self._allowed_remote_exmods = allowed_remote_exmods or []
def require_features(self, requeue=False):
+ """The driver must raise a 'NotImplementedError' if any of the feature
+ flags passed as True are not supported.
+ """
if requeue:
raise NotImplementedError('Message requeueing not supported by '
'this transport driver')
@abc.abstractmethod
def send(self, target, ctxt, message,
- wait_for_reply=None, timeout=None, envelope=False):
- """Send a message to the given target."""
+ wait_for_reply=None, timeout=None, envelope=False, retry=None):
+ """Send a message to the given target and optionally wait for a reply.
+ This method is used by the RPC client when sending RPC requests to a
+ server.
+
+ The driver must use the *topic*, *exchange*, and *server* (if present)
+ attributes of the *target* to construct the backend-native message
+ address. The message address must match the format used by
+ subscription(s) created by the :py:meth:`BaseDriver.listen` method.
+
+ If the *target's* *fanout* attribute is set, a copy of the message must
+ be sent to all subscriptions using the *exchange* and *topic*
+ values. If *fanout* is not set, then only one subscriber should receive
+ the message. In the case of multiple subscribers to the same address,
+ only one copy of the message is delivered. In this case the driver
+ should implement a delivery pattern that distributes messages in a
+ balanced fashion across the multiple subscribers.
+
+ This method must block the caller until one of the following events
+ occur:
+
+ * the send operation completes successfully
+ * *timeout* seconds elapse (if specified)
+ * *retry* count is reached (if specified)
+
+ The *wait_for_reply* parameter determines whether or not the caller
+ expects a response to the RPC request. If True, this method must block
+ until a response message is received. This method then returns the
+ response message to the caller. The driver must implement a mechanism
+ for routing incoming responses back to their corresponding send
+ request. How this is done may vary based on the type of messaging
+ backend, but typically it involves having the driver create an internal
+ subscription for reply messages and setting the request message's
+ *reply-to* header to the subscription address. The driver may also
+ need to supply a correlation identifier for mapping the response back
+ to the sender. See :py:meth:`RpcIncomingMessage.reply`
+
+ If *wait_for_reply* is False this method will block until the message
+ has been handed off to the backend - there is no need to confirm that
+ the message has been delivered. Once the handoff completes this method
+ returns.
+
+ The driver may attempt to retry sending the message should a
+ recoverable error occur that prevents the message from being passed to
+ the backend. The *retry* parameter specifies how many attempts to
+ re-send the message the driver may make before raising a
+ :py:exc:`MessageDeliveryFailure` exception. A value of None or -1 means
+ unlimited retries. 0 means no retry is attempted. N means attempt at
+ most N retries before failing. **Note well:** the driver MUST guarantee
+ that the message is not duplicated by the retry process.
+
+ :param target: The message's destination address
+ :type target: Target
+ :param ctxt: Context metadata provided by sending application which
+ must transfered along with the message.
+ :type ctxt: dict
+ :param message: message provided by the caller
+ :type message: dict
+ :param wait_for_reply: If True block until a reply message is received.
+ :type wait_for_reply: bool
+ :param timeout: Maximum time in seconds to block waiting for the send
+ operation to complete. Should this expire the :py:meth:`send` must
+ raise a :py:exc:`MessagingTimeout` exception
+ :type timeout: float
+ :param retry: maximum message send attempts permitted
+ :type retry: int
+ :returns: A reply message or None if no reply expected
+ :raises: :py:exc:`MessagingException`, any exception thrown by the
+ remote server when executing the RPC call.
+ """
@abc.abstractmethod
- def send_notification(self, target, ctxt, message, version):
- """Send a notification message to the given target."""
+ def send_notification(self, target, ctxt, message, version, retry):
+ """Send a notification message to the given target. This method is used
+ by the Notifier to send notification messages to a Listener.
+
+ Notifications use a *store and forward* delivery pattern. The driver
+ must allow for delivery in the case where the intended recipient is
+ not present at the time the notification is published. Typically this
+ requires a messaging backend that has the ability to store messages
+ until a consumer is present.
+
+ Therefore this method must block at least until the backend accepts
+ ownership of the message. This method does not guarantee that the
+ message has or will be processed by the intended recipient.
+
+ The driver must use the *topic* and *exchange* attributes of the
+ *target* to construct the backend-native message address. The message
+ address must match the format used by subscription(s) created by the
+ :py:meth:`BaseDriver.listen_for_notifications` method. Only one copy of
+ the message is delivered in the case of multiple subscribers to the
+ same address. In this case the driver should implement a delivery
+ pattern that distributes messages in a balanced fashion across the
+ multiple subscribers.
+
+ There is an exception to the single delivery semantics described above:
+ the *pool* parameter to the
+ :py:meth:`BaseDriver.listen_for_notifications` method may be used to
+ set up shared subscriptions. See
+ :py:meth:`BaseDriver.listen_for_notifications` for details.
+
+ This method must also honor the *retry* parameter. See
+ :py:meth:`BaseDriver.send` for details regarding implementing the
+ *retry* process.
+
+ *version* indicates whether or not the message should be encapsulated
+ in an envelope. A value < 2.0 should not envelope the message. See
+ :py:func:`common.serialize_msg` for more detail.
+
+ :param target: The message's destination address
+ :type target: Target
+ :param ctxt: Context metadata provided by sending application which
+ must transfered along with the message.
+ :type ctxt: dict
+ :param message: message provided by the caller
+ :type message: dict
+ :param version: determines the envelope for the message
+ :type version: float
+ :param retry: maximum message send attempts permitted
+ :type retry: int
+ :returns: None
+ :raises: :py:exc:`MessagingException`
+ """
@abc.abstractmethod
def listen(self, target, batch_size, batch_timeout):
- """Construct a Listener for the given target."""
+ """Construct a listener for the given target. The listener may be
+ either a :py:class:`Listener` or :py:class:`PollStyleListener`
+ depending on the driver's preference. This method is used by the RPC
+ server.
+
+ The driver must create subscriptions to the address provided in
+ *target*. These subscriptions must then be associated with a
+ :py:class:`Listener` or :py:class:`PollStyleListener` which is returned
+ by this method. See :py:meth:`BaseDriver.send` for more detail
+ regarding message addressing.
+
+ The driver must support receiving messages sent to the following
+ addresses derived from the values in *target*:
+
+ * all messages sent to the exchange and topic given in the target.
+ This includes messages sent using a fanout pattern.
+ * if the server attribute of the target is set then the driver must
+ also subscribe to messages sent to the exchange, topic, and server
+
+ For example, given a target with exchange 'my-exchange', topic
+ 'my-topic', and server 'my-server', the driver would create
+ subscriptions for:
+
+ * all messages sent to my-exchange and my-topic (including fanout)
+ * all messages sent to my-exchange, my-topic, and my-server
+
+ The driver must pass messages arriving from these subscriptions to the
+ listener. For :py:class:`PollStyleListener` the driver should trigger
+ the :py:meth:`PollStyleListener.poll` method to unblock and return the
+ incoming messages. For :py:class:`Listener` the driver should invoke
+ the callback with the incoming messages.
+
+ This method only blocks long enough to establish the subscription(s)
+ and construct the listener. In the case of failover, the driver must
+ restore the subscription(s). Subscriptions should remain active until
+ the listener is stopped.
+
+ :param target: The address(es) to subscribe to.
+ :type target: Target
+ :param batch_size: passed to the listener
+ :type batch_size: int
+ :param batch_timeout: passed to the listener
+ :type batch_timeout: float
+ :returns: None
+ :raises: :py:exc:`MessagingException`
+ """
@abc.abstractmethod
def listen_for_notifications(self, targets_and_priorities, pool,
batch_size, batch_timeout):
- """Construct a notification Listener for the given list of
- tuple of (target, priority).
+ """Construct a notification listener for the given list of
+ tuples of (target, priority) addresses.
+
+ The driver must create a subscription for each (*target*, *priority*)
+ pair. The topic for the subscription is created for each pair using the
+ format `"%s.%s" % (target.topic, priority)`. This format is used by
+ the caller of the :py:meth:`BaseDriver.send_notification` when setting
+ the topic member of the target parameter.
+
+ Only the *exchange* and *topic* must be considered when creating
+ subscriptions. *server* and *fanout* must be ignored.
+
+ The *pool* parameter, if specified, should cause the driver to create a
+ subscription that is shared with other subscribers using the same pool
+ identifier. Each pool gets a single copy of the message. For example if
+ there is a subscriber pool with identifier **foo** and another pool
+ **bar**, then one **foo** subscriber and one **bar** subscriber will
+ each receive a copy of the message. The driver should implement a
+ delivery pattern that distributes message in a balanced fashion across
+ the subscribers in a pool.
+
+ The driver must raise a :py:exc:`NotImplementedError` if pooling is not
+ supported and a pool identifier is passed in.
+
+ Refer to the description of :py:meth:`BaseDriver.send_notification` for
+ further details regarding implementation.
+
+ :param targets_and_priorities: List of (target, priority) pairs
+ :type targets_and_priorities: list
+ :param pool: pool identifier
+ :type pool: str
+ :param batch_size: passed to the listener
+ :type batch_size: int
+ :param batch_timeout: passed to the listener
+ :type batch_timeout: float
+ :returns: None
+ :raises: :py:exc:`MessagingException`, :py:exc:`NotImplementedError`
"""
@abc.abstractmethod
def cleanup(self):
- """Release all resources."""
+ """Release all resources used by the driver. This method must block
+ until the cleanup is complete.
+ """