diff options
30 files changed, 13 insertions, 4551 deletions
@@ -12,13 +12,6 @@ bindep_profile: kafka - job: - name: oslo.messaging-tox-py27-func-pika - parent: openstack-tox-py27 - vars: - tox_envlist: py27-func-pika - bindep_profile: pika - -- job: name: oslo.messaging-tox-py27-func-rabbit parent: openstack-tox-py27 vars: @@ -77,17 +70,6 @@ - openstack/oslo.messaging - job: - name: oslo.messaging-src-dsvm-full-pika-default - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml - post-run: playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml - timeout: 10800 - required-projects: - - openstack-infra/devstack-gate - - openstack/devstack-plugin-pika - - openstack/oslo.messaging - -- job: name: oslo.messaging-src-dsvm-full-amqp1-dual-centos-7 parent: legacy-dsvm-base run: playbooks/oslo.messaging-src-dsvm-full-amqp1-dual-centos-7/run.yaml @@ -212,24 +194,6 @@ - openstack/diskimage-builder - job: - name: oslo.messaging-telemetry-dsvm-integration-pika - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml - post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml - timeout: 4200 - required-projects: - - openstack-infra/devstack-gate - - openstack/aodh - - openstack/ceilometer - - openstack/devstack-plugin-pika - - openstack/oslo.messaging - - openstack/panko - # following are required when DEVSTACK_GATE_HEAT, which this - # job turns on - - openstack/dib-utils - - openstack/diskimage-builder - -- job: name: oslo.messaging-telemetry-dsvm-integration-zmq parent: legacy-dsvm-base run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml @@ -305,19 +269,6 @@ - openstack/tempest - job: - name: oslo.messaging-tempest-neutron-dsvm-src-pika-default - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml - post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml - timeout: 7800 - required-projects: - - openstack-infra/devstack-gate - - openstack/devstack-plugin-pika - - openstack/neutron - - openstack/oslo.messaging - - openstack/tempest - -- job: name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default parent: legacy-dsvm-base run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml @@ -338,7 +289,6 @@ voting: false - oslo.messaging-tox-py27-func-kafka: voting: false - - oslo.messaging-tox-py27-func-pika - oslo.messaging-tox-py27-func-rabbit - oslo.messaging-tox-py27-func-zmq-proxy: voting: false @@ -364,8 +314,6 @@ voting: false - oslo.messaging-src-dsvm-full-kafka-default: voting: false - - oslo.messaging-src-dsvm-full-pika-default: - voting: false - oslo.messaging-src-dsvm-full-zmq-default: voting: false @@ -379,8 +327,6 @@ voting: false - oslo.messaging-telemetry-dsvm-integration-kafka: voting: false - - oslo.messaging-telemetry-dsvm-integration-pika: - voting: false - oslo.messaging-telemetry-dsvm-integration-zmq: voting: false @@ -390,15 +336,12 @@ branches: ^(?!stable/ocata).*$ - oslo.messaging-tempest-neutron-dsvm-src-kafka-default: voting: false - - oslo.messaging-tempest-neutron-dsvm-src-pika-default: - voting: false - oslo.messaging-tempest-neutron-dsvm-src-zmq-default: voting: false gate: jobs: - oslo.messaging-tox-py27-func-rabbit - - oslo.messaging-tox-py27-func-pika - oslo.messaging-telemetry-dsvm-integration-rabbit - oslo.messaging-src-dsvm-full-rabbit-default - oslo.messaging-tempest-neutron-dsvm-src-rabbit-default @@ -10,11 +10,9 @@ make [platform:rpm] pkgconfig [platform:rpm] libffi-devel [platform:rpm] -# kombu/pika dpkg -rabbitmq-server [platform:dpkg rabbit pika] - -# kombu/pika rpm -rabbitmq-server [platform:rpm rabbit pika] +# RabbitMQ message broker +rabbitmq-server [platform:dpkg rabbit] +rabbitmq-server [platform:rpm rabbit] # zmq redis [platform:rpm zmq] diff --git a/doc/source/admin/index.rst b/doc/source/admin/index.rst index d032aaf..63104bb 100644 --- a/doc/source/admin/index.rst +++ b/doc/source/admin/index.rst @@ -7,5 +7,4 @@ Deployment Guide drivers AMQP1.0 - pika_driver zmq_driver diff --git a/doc/source/admin/pika_driver.rst b/doc/source/admin/pika_driver.rst deleted file mode 100644 index b89fabd..0000000 --- a/doc/source/admin/pika_driver.rst +++ /dev/null @@ -1,160 +0,0 @@ ------------------------------- -Pika Driver Deployment Guide ------------------------------- - -.. currentmodule:: oslo_messaging - -.. warning:: the Pika driver is no longer maintained and will be - removed from Oslo.Messaging at a future date. It is recommended that - all users of the Pika driver transition to using the Rabbit driver. - -============ -Introduction -============ - -Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including -RabbitMQ's extensions. It is very actively supported and recommended by -RabbitMQ developers - -======== -Abstract -======== - -PikaDriver is one of oslo.messaging backend drivers. It supports RPC and Notify -patterns. Currently it could be the only oslo.messaging driver across the -OpenStack cluster. This document provides deployment information for this -driver in oslo_messaging. - -This driver is able to work with single instance of RabbitMQ server or -RabbitMQ cluster. - - -============= -Configuration -============= - -Enabling (mandatory) --------------------- - -To enable the driver, in the section [DEFAULT] of the conf file, -the 'transport_url' parameter should be set to -`pika://user:pass@host1:port[,hostN:portN]` - - [DEFAULT] - transport_url = pika://guest:guest@localhost:5672 - - -Connection options (optional) ------------------------------ - -In section [oslo_messaging_pika]: -#. channel_max - Maximum number of channels to allow, - -#. frame_max (default - pika default value): The maximum byte size for - an AMQP frame, - -#. heartbeat_interval (default=1): How often to send heartbeats for - consumer's connections in seconds. If 0 - disable heartbeats, - -#. ssl (default=False): Enable SSL if True, - -#. ssl_options (default=None): Arguments passed to ssl.wrap_socket, - -#. socket_timeout (default=0.25): Set timeout for opening new connection's - socket, - -#. tcp_user_timeout (default=0.25): Set TCP_USER_TIMEOUT in seconds for - connection's socket, - -#. host_connection_reconnect_delay (default=0.25): Set delay for reconnection - to some host after connection error - - -Connection pool options (optional) ----------------------------------- - -In section [oslo_messaging_pika]: - -#. pool_max_size (default=10): Maximum number of connections to keep queued, - -#. pool_max_overflow (default=0): Maximum number of connections to create above - `pool_max_size`, - -#. pool_timeout (default=30): Default number of seconds to wait for a - connections to available, - -#. pool_recycle (default=600): Lifetime of a connection (since creation) in - seconds or None for no recycling. Expired connections are closed on acquire, - -#. pool_stale (default=60): Threshold at which inactive (since release) - connections are considered stale in seconds or None for no staleness. - Stale connections are closed on acquire.") - -RPC related options (optional) ------------------------------- - -In section [oslo_messaging_pika]: - -#. rpc_queue_expiration (default=60): Time to live for rpc queues without - consumers in seconds, - -#. default_rpc_exchange (default="${control_exchange}_rpc"): Exchange name for - sending RPC messages, - -#. rpc_reply_exchange', default=("${control_exchange}_rpc_reply"): Exchange - name for receiving RPC replies, - -#. rpc_listener_prefetch_count (default=100): Max number of not acknowledged - message which RabbitMQ can send to rpc listener, - -#. rpc_reply_listener_prefetch_count (default=100): Max number of not - acknowledged message which RabbitMQ can send to rpc reply listener, - -#. rpc_reply_retry_attempts (default=-1): Reconnecting retry count in case of - connectivity problem during sending reply. -1 means infinite retry during - rpc_timeout, - -#. rpc_reply_retry_delay (default=0.25) Reconnecting retry delay in case of - connectivity problem during sending reply, - -#. default_rpc_retry_attempts (default=-1): Reconnecting retry count in case of - connectivity problem during sending RPC message, -1 means infinite retry. If - actual retry attempts in not 0 the rpc request could be processed more than - one time, - -#. rpc_retry_delay (default=0.25): Reconnecting retry delay in case of - connectivity problem during sending RPC message - -$control_exchange in this code is value of [DEFAULT].control_exchange option, -which is "openstack" by default - -Notification related options (optional) ---------------------------------------- - -In section [oslo_messaging_pika]: - -#. notification_persistence (default=False): Persist notification messages, - -#. default_notification_exchange (default="${control_exchange}_notification"): - Exchange name for sending notifications, - -#. notification_listener_prefetch_count (default=100): Max number of not - acknowledged message which RabbitMQ can send to notification listener, - -#. default_notification_retry_attempts (default=-1): Reconnecting retry count - in case of connectivity problem during sending notification, -1 means - infinite retry, - -#. notification_retry_delay (default=0.25): Reconnecting retry delay in case of - connectivity problem during sending notification message - -$control_exchange in this code is value of [DEFAULT].control_exchange option, -which is "openstack" by default - -DevStack Support ----------------- - -Pika driver is supported by DevStack. To enable it you should edit -local.conf [localrc] section and add next there: - - enable_plugin pika https://git.openstack.org/openstack/devstack-plugin-pika diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py deleted file mode 100644 index 65e7b48..0000000 --- a/oslo_messaging/_drivers/impl_pika.py +++ /dev/null @@ -1,366 +0,0 @@ -# Copyright 2011 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from debtcollector import deprecate -from oslo_config import cfg -from oslo_log import log as logging -from oslo_utils import timeutils -import pika_pool -import tenacity - -from oslo_messaging._drivers import base -from oslo_messaging._drivers import common -from oslo_messaging._drivers.pika_driver import (pika_connection_factory as - pika_drv_conn_factory) -from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns -from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine -from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc -from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr -from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg -from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller -from oslo_messaging import exceptions - -LOG = logging.getLogger(__name__) - -pika_pool_opts = [ - cfg.IntOpt('pool_max_size', default=30, - help="Maximum number of connections to keep queued."), - cfg.IntOpt('pool_max_overflow', default=0, - help="Maximum number of connections to create above " - "`pool_max_size`."), - cfg.IntOpt('pool_timeout', default=30, - help="Default number of seconds to wait for a connections to " - "available"), - cfg.IntOpt('pool_recycle', default=600, - help="Lifetime of a connection (since creation) in seconds " - "or None for no recycling. Expired connections are " - "closed on acquire."), - cfg.IntOpt('pool_stale', default=60, - help="Threshold at which inactive (since release) connections " - "are considered stale in seconds or None for no " - "staleness. Stale connections are closed on acquire.") -] - -message_opts = [ - cfg.StrOpt('default_serializer_type', default='json', - choices=('json', 'msgpack'), - help="Default serialization mechanism for " - "serializing/deserializing outgoing/incoming messages") -] - -notification_opts = [ - cfg.BoolOpt('notification_persistence', default=False, - help="Persist notification messages."), - cfg.StrOpt('default_notification_exchange', - default="${control_exchange}_notification", - help="Exchange name for sending notifications"), - cfg.IntOpt( - 'notification_listener_prefetch_count', default=100, - help="Max number of not acknowledged message which RabbitMQ can send " - "to notification listener." - ), - cfg.IntOpt( - 'default_notification_retry_attempts', default=-1, - help="Reconnecting retry count in case of connectivity problem during " - "sending notification, -1 means infinite retry." - ), - cfg.FloatOpt( - 'notification_retry_delay', default=0.25, - help="Reconnecting retry delay in case of connectivity problem during " - "sending notification message" - ) -] - -rpc_opts = [ - cfg.IntOpt('rpc_queue_expiration', default=60, - help="Time to live for rpc queues without consumers in " - "seconds."), - cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc", - help="Exchange name for sending RPC messages"), - cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply", - help="Exchange name for receiving RPC replies"), - cfg.IntOpt( - 'rpc_listener_prefetch_count', default=100, - help="Max number of not acknowledged message which RabbitMQ can send " - "to rpc listener." - ), - cfg.IntOpt( - 'rpc_reply_listener_prefetch_count', default=100, - help="Max number of not acknowledged message which RabbitMQ can send " - "to rpc reply listener." - ), - cfg.IntOpt( - 'rpc_reply_retry_attempts', default=-1, - help="Reconnecting retry count in case of connectivity problem during " - "sending reply. -1 means infinite retry during rpc_timeout" - ), - cfg.FloatOpt( - 'rpc_reply_retry_delay', default=0.25, - help="Reconnecting retry delay in case of connectivity problem during " - "sending reply." - ), - cfg.IntOpt( - 'default_rpc_retry_attempts', default=-1, - help="Reconnecting retry count in case of connectivity problem during " - "sending RPC message, -1 means infinite retry. If actual " - "retry attempts in not 0 the rpc request could be processed more " - "than one time" - ), - cfg.FloatOpt( - 'rpc_retry_delay', default=0.25, - help="Reconnecting retry delay in case of connectivity problem during " - "sending RPC message" - ) -] - - -class PikaDriver(base.BaseDriver): - """Pika Driver - - **Warning**: The ``pika`` driver has been deprecated and will be removed in - a future release. It is recommended that all users of the ``pika`` driver - transition to using the ``rabbit`` driver. - """ - - def __init__(self, conf, url, default_exchange=None, - allowed_remote_exmods=None): - - deprecate("The pika driver is no longer maintained. It has been" - " deprecated", - message="It is recommended that all users of the pika driver" - " transition to using the rabbit driver.", - version="pike", removal_version="rocky") - - opt_group = cfg.OptGroup(name='oslo_messaging_pika', - title='Pika driver options') - conf.register_group(opt_group) - conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group) - conf.register_opts(pika_pool_opts, group=opt_group) - conf.register_opts(message_opts, group=opt_group) - conf.register_opts(rpc_opts, group=opt_group) - conf.register_opts(notification_opts, group=opt_group) - conf = common.ConfigOptsProxy(conf, url, opt_group.name) - - self._pika_engine = pika_drv_engine.PikaEngine( - conf, url, default_exchange, allowed_remote_exmods - ) - self._reply_listener = pika_drv_lstnr.RpcReplyPikaListener( - self._pika_engine - ) - super(PikaDriver, self).__init__(conf, url, default_exchange, - allowed_remote_exmods) - - def require_features(self, requeue=False): - pass - - def _declare_rpc_exchange(self, exchange, stopwatch): - timeout = stopwatch.leftover(return_none=True) - with (self._pika_engine.connection_without_confirmation_pool - .acquire(timeout=timeout)) as conn: - try: - self._pika_engine.declare_exchange_by_channel( - conn.channel, - self._pika_engine.get_rpc_exchange_name( - exchange - ), "direct", False - ) - except pika_pool.Timeout as e: - raise exceptions.MessagingTimeout( - "Timeout for current operation was expired. {}.".format( - str(e) - ) - ) - - def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, - retry=None): - with timeutils.StopWatch(duration=timeout) as stopwatch: - if retry is None: - retry = self._pika_engine.default_rpc_retry_attempts - - exchange = self._pika_engine.get_rpc_exchange_name( - target.exchange - ) - - def on_exception(ex): - if isinstance(ex, pika_drv_exc.ExchangeNotFoundException): - # it is desired to create exchange because if we sent to - # exchange which is not exists, we get ChannelClosed - # exception and need to reconnect - try: - self._declare_rpc_exchange(exchange, stopwatch) - except pika_drv_exc.ConnectionException as e: - LOG.warning("Problem during declaring exchange. %s", e) - return True - elif isinstance(ex, (pika_drv_exc.ConnectionException, - exceptions.MessageDeliveryFailure)): - LOG.warning("Problem during message sending. %s", ex) - return True - else: - return False - - if retry: - retrier = tenacity.retry( - stop=(tenacity.stop_never if retry == -1 else - tenacity.stop_after_attempt(retry)), - retry=tenacity.retry_if_exception(on_exception), - wait=tenacity.wait_fixed(self._pika_engine.rpc_retry_delay) - ) - else: - retrier = None - - if target.fanout: - return self.cast_all_workers( - exchange, target.topic, ctxt, message, stopwatch, retrier - ) - - routing_key = self._pika_engine.get_rpc_queue_name( - target.topic, target.server, retrier is None - ) - - msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, - message, ctxt) - try: - reply = msg.send( - exchange=exchange, - routing_key=routing_key, - reply_listener=( - self._reply_listener if wait_for_reply else None - ), - stopwatch=stopwatch, - retrier=retrier - ) - except pika_drv_exc.ExchangeNotFoundException as ex: - try: - self._declare_rpc_exchange(exchange, stopwatch) - except pika_drv_exc.ConnectionException as e: - LOG.warning("Problem during declaring exchange. %s", e) - raise ex - - if reply is not None: - if reply.failure is not None: - raise reply.failure - - return reply.result - - def cast_all_workers(self, exchange, topic, ctxt, message, stopwatch, - retrier=None): - msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message, - ctxt) - try: - msg.send( - exchange=exchange, - routing_key=self._pika_engine.get_rpc_queue_name( - topic, "all_workers", retrier is None - ), - mandatory=False, - stopwatch=stopwatch, - retrier=retrier - ) - except pika_drv_exc.ExchangeNotFoundException: - try: - self._declare_rpc_exchange(exchange, stopwatch) - except pika_drv_exc.ConnectionException as e: - LOG.warning("Problem during declaring exchange. %s", e) - - def _declare_notification_queue_binding( - self, target, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH): - if stopwatch.expired(): - raise exceptions.MessagingTimeout( - "Timeout for current operation was expired." - ) - try: - timeout = stopwatch.leftover(return_none=True) - with (self._pika_engine.connection_without_confirmation_pool - .acquire)(timeout=timeout) as conn: - self._pika_engine.declare_queue_binding_by_channel( - conn.channel, - exchange=( - target.exchange or - self._pika_engine.default_notification_exchange - ), - queue=target.topic, - routing_key=target.topic, - exchange_type='direct', - queue_expiration=None, - durable=self._pika_engine.notification_persistence, - ) - except pika_pool.Timeout as e: - raise exceptions.MessagingTimeout( - "Timeout for current operation was expired. {}.".format(str(e)) - ) - - def send_notification(self, target, ctxt, message, version, retry=None): - if retry is None: - retry = self._pika_engine.default_notification_retry_attempts - - def on_exception(ex): - if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException, - pika_drv_exc.RoutingException)): - LOG.warning("Problem during sending notification. %s", ex) - try: - self._declare_notification_queue_binding(target) - except pika_drv_exc.ConnectionException as e: - LOG.warning("Problem during declaring notification queue " - "binding. %s", e) - return True - elif isinstance(ex, (pika_drv_exc.ConnectionException, - pika_drv_exc.MessageRejectedException)): - LOG.warning("Problem during sending notification. %s", ex) - return True - else: - return False - - if retry: - retrier = tenacity.retry( - stop=(tenacity.stop_never if retry == -1 else - tenacity.stop_after_attempt(retry)), - retry=tenacity.retry_if_exception(on_exception), - wait=tenacity.wait_fixed( - self._pika_engine.notification_retry_delay - ) - ) - else: - retrier = None - - msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message, - ctxt) - return msg.send( - exchange=( - target.exchange or - self._pika_engine.default_notification_exchange - ), - routing_key=target.topic, - confirm=True, - mandatory=True, - persistent=self._pika_engine.notification_persistence, - retrier=retrier - ) - - def listen(self, target, batch_size, batch_timeout): - return pika_drv_poller.RpcServicePikaPoller( - self._pika_engine, target, batch_size, batch_timeout, - self._pika_engine.rpc_listener_prefetch_count - ) - - def listen_for_notifications(self, targets_and_priorities, pool, - batch_size, batch_timeout): - return pika_drv_poller.NotificationPikaPoller( - self._pika_engine, targets_and_priorities, batch_size, - batch_timeout, - self._pika_engine.notification_listener_prefetch_count, pool - ) - - def cleanup(self): - self._reply_listener.cleanup() - self._pika_engine.cleanup() diff --git a/oslo_messaging/_drivers/pika_driver/__init__.py b/oslo_messaging/_drivers/pika_driver/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/oslo_messaging/_drivers/pika_driver/__init__.py +++ /dev/null diff --git a/oslo_messaging/_drivers/pika_driver/pika_commons.py b/oslo_messaging/_drivers/pika_driver/pika_commons.py deleted file mode 100644 index f5e9086..0000000 --- a/oslo_messaging/_drivers/pika_driver/pika_commons.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import select -import socket - -from oslo_serialization.serializer import json_serializer -from oslo_serialization.serializer import msgpack_serializer -from oslo_utils import timeutils -from pika import exceptions as pika_exceptions -import six - - -PIKA_CONNECTIVITY_ERRORS = ( - pika_exceptions.AMQPConnectionError, - pika_exceptions.ConnectionClosed, - pika_exceptions.ChannelClosed, - socket.timeout, - select.error -) - -EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' - -INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start() - -MESSAGE_SERIALIZERS = { - 'application/json': json_serializer.JSONSerializer(), - 'application/msgpack': msgpack_serializer.MessagePackSerializer() -} diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection.py b/oslo_messaging/_drivers/pika_driver/pika_connection.py deleted file mode 100644 index f0dca5a..0000000 --- a/oslo_messaging/_drivers/pika_driver/pika_connection.py +++ /dev/null @@ -1,542 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import logging -import os -import threading - -import futurist -from pika.adapters import select_connection -from pika import exceptions as pika_exceptions -from pika import spec as pika_spec - -from oslo_utils import eventletutils - -current_thread = eventletutils.fetch_current_thread_functor() - -LOG = logging.getLogger(__name__) - - -class ThreadSafePikaConnection(object): - def __init__(self, parameters=None, - _impl_class=select_connection.SelectConnection): - self.params = parameters - self._connection_lock = threading.Lock() - self._evt_closed = threading.Event() - self._task_queue = collections.deque() - self._pending_connection_futures = set() - - create_connection_future = self._register_pending_future() - - def on_open_error(conn, err): - create_connection_future.set_exception( - pika_exceptions.AMQPConnectionError(err) - ) - - self._impl = _impl_class( - parameters=parameters, - on_open_callback=create_connection_future.set_result, - on_open_error_callback=on_open_error, - on_close_callback=self._on_connection_close, - stop_ioloop_on_close=False, - ) - self._interrupt_pipein, self._interrupt_pipeout = os.pipe() - self._impl.ioloop.add_handler(self._interrupt_pipein, - self._impl.ioloop.read_interrupt, - select_connection.READ) - - self._thread = threading.Thread(target=self._process_io) - self._thread.daemon = True - self._thread_id = None - self._thread.start() - - create_connection_future.result() - - def _check_called_not_from_event_loop(self): - if current_thread() == self._thread_id: - raise RuntimeError("This call is not allowed from ioloop thread") - - def _execute_task(self, func, *args, **kwargs): - if current_thread() == self._thread_id: - return func(*args, **kwargs) - - future = futurist.Future() - self._task_queue.append((func, args, kwargs, future)) - - if self._evt_closed.is_set(): - self._notify_all_futures_connection_close() - elif self._interrupt_pipeout is not None: - os.write(self._interrupt_pipeout, b'X') - - return future.result() - - def _register_pending_future(self): - future = futurist.Future() - self._pending_connection_futures.add(future) - - def on_done_callback(fut): - try: - self._pending_connection_futures.remove(fut) - except KeyError: - pass - - future.add_done_callback(on_done_callback) - - if self._evt_closed.is_set(): - self._notify_all_futures_connection_close() - return future - - def _notify_all_futures_connection_close(self): - while self._task_queue: - try: - method_res_future = self._task_queue.pop()[3] - except KeyError: - break - else: - method_res_future.set_exception( - pika_exceptions.ConnectionClosed() - ) - - while self._pending_connection_futures: - try: - pending_connection_future = ( - self._pending_connection_futures.pop() - ) - except KeyError: - break - else: - pending_connection_future.set_exception( - pika_exceptions.ConnectionClosed() - ) - - def _on_connection_close(self, conn, reply_code, reply_text): - self._evt_closed.set() - self._notify_all_futures_connection_close() - if self._interrupt_pipeout: - os.close(self._interrupt_pipeout) - os.close(self._interrupt_pipein) - - def add_on_close_callback(self, callback): - return self._execute_task(self._impl.add_on_close_callback, callback) - - def _do_process_io(self): - while self._task_queue: - func, args, kwargs, future = self._task_queue.pop() - try: - res = func(*args, **kwargs) - except BaseException as e: - LOG.exception(e) - future.set_exception(e) - else: - future.set_result(res) - - self._impl.ioloop.poll() - self._impl.ioloop.process_timeouts() - - def _process_io(self): - self._thread_id = current_thread() - while not self._evt_closed.is_set(): - try: - self._do_process_io() - except BaseException: - LOG.exception("Error during processing connection's IO") - - def close(self, *args, **kwargs): - self._check_called_not_from_event_loop() - - res = self._execute_task(self._impl.close, *args, **kwargs) - - self._evt_closed.wait() - self._thread.join() - return res - - def channel(self, channel_number=None): - self._check_called_not_from_event_loop() - - channel_opened_future = self._register_pending_future() - - impl_channel = self._execute_task( - self._impl.channel, - on_open_callback=channel_opened_future.set_result, - channel_number=channel_number - ) - - # Create our proxy channel - channel = ThreadSafePikaChannel(impl_channel, self) - - # Link implementation channel with our proxy channel - impl_channel._set_cookie(channel) - - channel_opened_future.result() - return channel - - def add_timeout(self, timeout, callback): - return self._execute_task(self._impl.add_timeout, timeout, callback) - - def remove_timeout(self, timeout_id): - return self._execute_task(self._impl.remove_timeout, timeout_id) - - @property - def is_closed(self): - return self._impl.is_closed - - @property - def is_closing(self): - return self._impl.is_closing - - @property - def is_open(self): - return self._impl.is_open - - -class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 - - def __init__(self, channel_impl, connection): - self._impl = channel_impl - self._connection = connection - - self._delivery_confirmation = False - - self._message_returned = False - self._current_future = None - - self._evt_closed = threading.Event() - - self.add_on_close_callback(self._on_channel_close) - - def _execute_task(self, func, *args, **kwargs): - return self._connection._execute_task(func, *args, **kwargs) - - def _on_channel_close(self, channel, reply_code, reply_text): - self._evt_closed.set() - - if self._current_future: - self._current_future.set_exception( - pika_exceptions.ChannelClosed(reply_code, reply_text)) - - def _on_message_confirmation(self, frame): - self._current_future.set_result(frame) - - def add_on_close_callback(self, callback): - self._execute_task(self._impl.add_on_close_callback, callback) - - def add_on_cancel_callback(self, callback): - self._execute_task(self._impl.add_on_cancel_callback, callback) - - def __int__(self): - return self.channel_number - - @property - def channel_number(self): - return self._impl.channel_number - - @property - def is_closed(self): - return self._impl.is_closed - - @property - def is_closing(self): - return self._impl.is_closing - - @property - def is_open(self): - return self._impl.is_open - - def close(self, reply_code=0, reply_text="Normal Shutdown"): - self._impl.close(reply_code=reply_code, reply_text=reply_text) - self._evt_closed.wait() - - def _check_called_not_from_event_loop(self): - self._connection._check_called_not_from_event_loop() - - def flow(self, active): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task( - self._impl.flow, callback=self._current_future.set_result, - active=active - ) - - return self._current_future.result() - - def basic_consume(self, # pylint: disable=R0913 - consumer_callback, - queue, - no_ack=False, - exclusive=False, - consumer_tag=None, - arguments=None): - - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task( - self._impl.add_callback, self._current_future.set_result, - replies=[pika_spec.Basic.ConsumeOk], one_shot=True - ) - - self._impl.add_callback(self._current_future.set_result, - replies=[pika_spec.Basic.ConsumeOk], - one_shot=True) - tag = self._execute_task( - self._impl.basic_consume, - consumer_callback=consumer_callback, - queue=queue, - no_ack=no_ack, - exclusive=exclusive, - consumer_tag=consumer_tag, - arguments=arguments - ) - - self._current_future.result() - return tag - - def basic_cancel(self, consumer_tag): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task( - self._impl.basic_cancel, - callback=self._current_future.set_result, - consumer_tag=consumer_tag, - nowait=False) - self._current_future.result() - - def basic_ack(self, delivery_tag=0, multiple=False): - return self._execute_task( - self._impl.basic_ack, delivery_tag=delivery_tag, multiple=multiple) - - def basic_nack(self, delivery_tag=None, multiple=False, requeue=True): - return self._execute_task( - self._impl.basic_nack, delivery_tag=delivery_tag, - multiple=multiple, requeue=requeue - ) - - def publish(self, exchange, routing_key, body, # pylint: disable=R0913 - properties=None, mandatory=False, immediate=False): - - if self._delivery_confirmation: - self._check_called_not_from_event_loop() - - # In publisher-acknowledgments mode - self._message_returned = False - self._current_future = futurist.Future() - - self._execute_task(self._impl.basic_publish, - exchange=exchange, - routing_key=routing_key, - body=body, - properties=properties, - mandatory=mandatory, - immediate=immediate) - - conf_method = self._current_future.result().method - - if isinstance(conf_method, pika_spec.Basic.Nack): - raise pika_exceptions.NackError((None,)) - else: - assert isinstance(conf_method, pika_spec.Basic.Ack), ( - conf_method) - - if self._message_returned: - raise pika_exceptions.UnroutableError((None,)) - else: - # In non-publisher-acknowledgments mode - self._execute_task(self._impl.basic_publish, - exchange=exchange, - routing_key=routing_key, - body=body, - properties=properties, - mandatory=mandatory, - immediate=immediate) - - def basic_qos(self, prefetch_size=0, prefetch_count=0, all_channels=False): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.basic_qos, - callback=self._current_future.set_result, - prefetch_size=prefetch_size, - prefetch_count=prefetch_count, - all_channels=all_channels) - self._current_future.result() - - def basic_recover(self, requeue=False): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task( - self._impl.basic_recover, - callback=lambda: self._current_future.set_result(None), - requeue=requeue - ) - self._current_future.result() - - def basic_reject(self, delivery_tag=None, requeue=True): - self._execute_task(self._impl.basic_reject, - delivery_tag=delivery_tag, - requeue=requeue) - - def _on_message_returned(self, *args, **kwargs): - self._message_returned = True - - def confirm_delivery(self): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.add_callback, - callback=self._current_future.set_result, - replies=[pika_spec.Confirm.SelectOk], - one_shot=True) - self._execute_task(self._impl.confirm_delivery, - callback=self._on_message_confirmation, - nowait=False) - self._current_future.result() - - self._delivery_confirmation = True - self._execute_task(self._impl.add_on_return_callback, - self._on_message_returned) - - def exchange_declare(self, exchange=None, # pylint: disable=R0913 - exchange_type='direct', passive=False, durable=False, - auto_delete=False, internal=False, - arguments=None, **kwargs): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.exchange_declare, - callback=self._current_future.set_result, - exchange=exchange, - exchange_type=exchange_type, - passive=passive, - durable=durable, - auto_delete=auto_delete, - internal=internal, - nowait=False, - arguments=arguments, - type=kwargs["type"] if kwargs else None) - - return self._current_future.result() - - def exchange_delete(self, exchange=None, if_unused=False): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.exchange_delete, - callback=self._current_future.set_result, - exchange=exchange, - if_unused=if_unused, - nowait=False) - - return self._current_future.result() - - def exchange_bind(self, destination=None, source=None, routing_key='', - arguments=None): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.exchange_bind, - callback=self._current_future.set_result, - destination=destination, - source=source, - routing_key=routing_key, - nowait=False, - arguments=arguments) - - return self._current_future.result() - - def exchange_unbind(self, destination=None, source=None, routing_key='', - arguments=None): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.exchange_unbind, - callback=self._current_future.set_result, - destination=destination, - source=source, - routing_key=routing_key, - nowait=False, - arguments=arguments) - - return self._current_future.result() - - def queue_declare(self, queue='', passive=False, durable=False, - exclusive=False, auto_delete=False, - arguments=None): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.queue_declare, - callback=self._current_future.set_result, - queue=queue, - passive=passive, - durable=durable, - exclusive=exclusive, - auto_delete=auto_delete, - nowait=False, - arguments=arguments) - - return self._current_future.result() - - def queue_delete(self, queue='', if_unused=False, if_empty=False): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.queue_delete, - callback=self._current_future.set_result, - queue=queue, - if_unused=if_unused, - if_empty=if_empty, - nowait=False) - - return self._current_future.result() - - def queue_purge(self, queue=''): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.queue_purge, - callback=self._current_future.set_result, - queue=queue, - nowait=False) - return self._current_future.result() - - def queue_bind(self, queue, exchange, routing_key=None, - arguments=None): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.queue_bind, - callback=self._current_future.set_result, - queue=queue, - exchange=exchange, - routing_key=routing_key, - nowait=False, - arguments=arguments) - return self._current_future.result() - - def queue_unbind(self, queue='', exchange=None, routing_key=None, - arguments=None): - self._check_called_not_from_event_loop() - - self._current_future = futurist.Future() - self._execute_task(self._impl.queue_unbind, - callback=self._current_future.set_result, - queue=queue, - exchange=exchange, - routing_key=routing_key, - arguments=arguments) - return self._current_future.result() diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py b/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py deleted file mode 100644 index a78c55e..0000000 --- a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py +++ /dev/null @@ -1,307 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import logging -import random -import socket -import threading -import time - -from oslo_config import cfg -import pika -from pika import credentials as pika_credentials - -from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns -from oslo_messaging._drivers.pika_driver import pika_connection -from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc - -LOG = logging.getLogger(__name__) - -# constant for setting tcp_user_timeout socket option -# (it should be defined in 'select' module of standard library in future) -TCP_USER_TIMEOUT = 18 - -# constants for creating connection statistics -HOST_CONNECTION_LAST_TRY_TIME = "last_try_time" -HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time" - -pika_opts = [ - cfg.IntOpt('channel_max', - help='Maximum number of channels to allow'), - cfg.IntOpt('frame_max', - help='The maximum byte size for an AMQP frame'), - cfg.IntOpt('heartbeat_interval', default=3, - help="How often to send heartbeats for consumer's connections"), - cfg.BoolOpt('ssl', - help='Enable SSL'), - cfg.DictOpt('ssl_options', - help='Arguments passed to ssl.wrap_socket'), - cfg.FloatOpt('socket_timeout', default=0.25, - help="Set socket timeout in seconds for connection's socket"), - cfg.FloatOpt('tcp_user_timeout', default=0.25, - help="Set TCP_USER_TIMEOUT in seconds for connection's " - "socket"), - cfg.FloatOpt('host_connection_reconnect_delay', default=0.25, - help="Set delay for reconnection to some host which has " - "connection error"), - cfg.StrOpt('connection_factory', default="single", - choices=["new", "single", "read_write"], - help='Connection factory implementation') -] - - -class PikaConnectionFactory(object): - - def __init__(self, url, conf): - self._url = url - self._conf = conf - - self._connection_lock = threading.RLock() - - if not url.hosts: - raise ValueError("You should provide at least one RabbitMQ host") - - # initializing connection parameters for configured RabbitMQ hosts - self._common_pika_params = { - 'virtual_host': url.virtual_host, - 'channel_max': conf.oslo_messaging_pika.channel_max, - 'frame_max': conf.oslo_messaging_pika.frame_max, - 'ssl': conf.oslo_messaging_pika.ssl, - 'ssl_options': conf.oslo_messaging_pika.ssl_options, - 'socket_timeout': conf.oslo_messaging_pika.socket_timeout - } - - self._host_list = url.hosts - self._heartbeat_interval = conf.oslo_messaging_pika.heartbeat_interval - self._host_connection_reconnect_delay = ( - conf.oslo_messaging_pika.host_connection_reconnect_delay - ) - self._tcp_user_timeout = conf.oslo_messaging_pika.tcp_user_timeout - - self._connection_host_status = {} - - self._cur_connection_host_num = random.randint( - 0, len(url.hosts) - 1 - ) - - def cleanup(self): - pass - - def create_connection(self, for_listening=False): - """Create and return connection to any available host. - - :return: created connection - :raise: ConnectionException if all hosts are not reachable - """ - - with self._connection_lock: - - host_count = len(self._host_list) - connection_attempts = host_count - - while connection_attempts > 0: - self._cur_connection_host_num += 1 - self._cur_connection_host_num %= host_count - try: - return self._create_host_connection( - self._cur_connection_host_num, for_listening - ) - except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e: - LOG.warning("Can't establish connection to host. %s", e) - except pika_drv_exc.HostConnectionNotAllowedException as e: - LOG.warning("Connection to host is not allowed. %s", e) - - connection_attempts -= 1 - - raise pika_drv_exc.EstablishConnectionException( - "Can not establish connection to any configured RabbitMQ " - "host: " + str(self._host_list) - ) - - def _set_tcp_user_timeout(self, s): - if not self._tcp_user_timeout: - return - try: - s.setsockopt( - socket.IPPROTO_TCP, TCP_USER_TIMEOUT, - int(self._tcp_user_timeout * 1000) - ) - except socket.error: - LOG.warning( - "Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT." - ) - - def _create_host_connection(self, host_index, for_listening): - """Create new connection to host #host_index - - :param host_index: Integer, number of host for connection establishing - :param for_listening: Boolean, creates connection for listening - if True - :return: New connection - """ - host = self._host_list[host_index] - - cur_time = time.time() - - host_connection_status = self._connection_host_status.get(host) - - if host_connection_status is None: - host_connection_status = { - HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0, - HOST_CONNECTION_LAST_TRY_TIME: 0 - } - self._connection_host_status[host] = host_connection_status - - last_success_time = host_connection_status[ - HOST_CONNECTION_LAST_SUCCESS_TRY_TIME - ] - last_time = host_connection_status[ - HOST_CONNECTION_LAST_TRY_TIME - ] - - # raise HostConnectionNotAllowedException if we tried to establish - # connection in last 'host_connection_reconnect_delay' and got - # failure - if (last_time != last_success_time and - cur_time - last_time < - self._host_connection_reconnect_delay): - raise pika_drv_exc.HostConnectionNotAllowedException( - "Connection to host #{} is not allowed now because of " - "previous failure".format(host_index) - ) - - try: - connection = self._do_create_host_connection( - host, for_listening - ) - self._connection_host_status[host][ - HOST_CONNECTION_LAST_SUCCESS_TRY_TIME - ] = cur_time - - return connection - finally: - self._connection_host_status[host][ - HOST_CONNECTION_LAST_TRY_TIME - ] = cur_time - - def _do_create_host_connection(self, host, for_listening): - connection_params = pika.ConnectionParameters( - host=host.hostname, - port=host.port, - credentials=pika_credentials.PlainCredentials( - host.username, host.password - ), - heartbeat_interval=( - self._heartbeat_interval if for_listening else None - ), - **self._common_pika_params - ) - if for_listening: - connection = pika_connection.ThreadSafePikaConnection( - parameters=connection_params - ) - else: - connection = pika.BlockingConnection( - parameters=connection_params - ) - connection.params = connection_params - - self._set_tcp_user_timeout(connection._impl.socket) - return connection - - -class NotClosableConnection(object): - def __init__(self, connection): - self._connection = connection - - def __getattr__(self, item): - return getattr(self._connection, item) - - def close(self): - pass - - -class SinglePikaConnectionFactory(PikaConnectionFactory): - def __init__(self, url, conf): - super(SinglePikaConnectionFactory, self).__init__(url, conf) - self._connection = None - - def create_connection(self, for_listening=False): - with self._connection_lock: - if self._connection is None or not self._connection.is_open: - self._connection = ( - super(SinglePikaConnectionFactory, self).create_connection( - True - ) - ) - return NotClosableConnection(self._connection) - - def cleanup(self): - with self._connection_lock: - if self._connection is not None and self._connection.is_open: - try: - self._connection.close() - except Exception: - LOG.warning( - "Unexpected exception during connection closing", - exc_info=True - ) - self._connection = None - - -class ReadWritePikaConnectionFactory(PikaConnectionFactory): - def __init__(self, url, conf): - super(ReadWritePikaConnectionFactory, self).__init__(url, conf) - self._read_connection = None - self._write_connection = None - - def create_connection(self, for_listening=False): - with self._connection_lock: - if for_listening: - if (self._read_connection is None or - not self._read_connection.is_open): - self._read_connection = super( - ReadWritePikaConnectionFactory, self - ).create_connection(True) - return NotClosableConnection(self._read_connection) - else: - if (self._write_connection is None or - not self._write_connection.is_open): - self._write_connection = super( - ReadWritePikaConnectionFactory, self - ).create_connection(True) - return NotClosableConnection(self._write_connection) - - def cleanup(self): - with self._connection_lock: - if (self._read_connection is not None and - self._read_connection.is_open): - try: - self._read_connection.close() - except Exception: - LOG.warning( - "Unexpected exception during connection closing", - exc_info=True - ) - self._read_connection = None - - if (self._write_connection is not None and - self._write_connection.is_open): - try: - self._write_connection.close() - except Exception: - LOG.warning( - "Unexpected exception during connection closing", - exc_info=True - ) - self._write_connection = None diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py deleted file mode 100644 index 157c499..0000000 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ /dev/null @@ -1,303 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -import logging -import os -import threading -import uuid - -from oslo_utils import eventletutils -import pika_pool -from stevedore import driver - -from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns -from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc - -LOG = logging.getLogger(__name__) - - -class _PooledConnectionWithConfirmations(pika_pool.Connection): - """Derived from 'pika_pool.Connection' and extends its logic - adds - 'confirm_delivery' call after channel creation to enable delivery - confirmation for channel - """ - @property - def channel(self): - if self.fairy.channel is None: - self.fairy.channel = self.fairy.cxn.channel() - self.fairy.channel.confirm_delivery() - return self.fairy.channel - - -class PikaEngine(object): - """Used for shared functionality between other pika driver modules, like - connection factory, connection pools, processing and holding configuration, - etc. - """ - - def __init__(self, conf, url, default_exchange=None, - allowed_remote_exmods=None): - self.conf = conf - self.url = url - - self._connection_factory_type = ( - self.conf.oslo_messaging_pika.connection_factory - ) - - self._connection_factory = None - self._connection_without_confirmation_pool = None - self._connection_with_confirmation_pool = None - self._pid = None - self._init_lock = threading.Lock() - - self.host_connection_reconnect_delay = ( - conf.oslo_messaging_pika.host_connection_reconnect_delay - ) - - # processing rpc options - self.default_rpc_exchange = ( - conf.oslo_messaging_pika.default_rpc_exchange - ) - self.rpc_reply_exchange = ( - conf.oslo_messaging_pika.rpc_reply_exchange - ) - - self.allowed_remote_exmods = [pika_drv_cmns.EXCEPTIONS_MODULE] - if allowed_remote_exmods: - self.allowed_remote_exmods.extend(allowed_remote_exmods) - - self.rpc_listener_prefetch_count = ( - conf.oslo_messaging_pika.rpc_listener_prefetch_count - ) - - self.default_rpc_retry_attempts = ( - conf.oslo_messaging_pika.default_rpc_retry_attempts - ) - - self.rpc_retry_delay = ( - conf.oslo_messaging_pika.rpc_retry_delay - ) - if self.rpc_retry_delay < 0: - raise ValueError("rpc_retry_delay should be non-negative integer") - - self.rpc_reply_listener_prefetch_count = ( - conf.oslo_messaging_pika.rpc_listener_prefetch_count - ) - - self.rpc_reply_retry_attempts = ( - conf.oslo_messaging_pika.rpc_reply_retry_attempts - ) - self.rpc_reply_retry_delay = ( - conf.oslo_messaging_pika.rpc_reply_retry_delay - ) - if self.rpc_reply_retry_delay < 0: - raise ValueError("rpc_reply_retry_delay should be non-negative " - "integer") - - self.rpc_queue_expiration = ( - self.conf.oslo_messaging_pika.rpc_queue_expiration - ) - - # processing notification options - self.default_notification_exchange = ( - conf.oslo_messaging_pika.default_notification_exchange - ) - - self.notification_persistence = ( - conf.oslo_messaging_pika.notification_persistence - ) - - self.notification_listener_prefetch_count = ( - conf.oslo_messaging_pika.notification_listener_prefetch_count - ) - - self.default_notification_retry_attempts = ( - conf.oslo_messaging_pika.default_notification_retry_attempts - ) - if self.default_notification_retry_attempts is None: - raise ValueError("default_notification_retry_attempts should be " - "an integer") - self.notification_retry_delay = ( - conf.oslo_messaging_pika.notification_retry_delay - ) - if (self.notification_retry_delay is None or - self.notification_retry_delay < 0): - raise ValueError("notification_retry_delay should be non-negative " - "integer") - - self.default_content_type = ( - 'application/' + conf.oslo_messaging_pika.default_serializer_type - ) - - def _init_if_needed(self): - cur_pid = os.getpid() - - if self._pid == cur_pid: - return - - with self._init_lock: - if self._pid == cur_pid: - return - - if self._pid: - LOG.warning("New pid is detected. Old: %s, new: %s. " - "Cleaning up...", self._pid, cur_pid) - - # Note(dukhlov): we need to force select poller usage in case - # when 'thread' module is monkey patched becase current - # eventlet implementation does not support patching of - # poll/epoll/kqueue - if eventletutils.is_monkey_patched("thread"): - from pika.adapters import select_connection - select_connection.SELECT_TYPE = "select" - - mgr = driver.DriverManager( - 'oslo.messaging.pika.connection_factory', - self._connection_factory_type - ) - - self._connection_factory = mgr.driver(self.url, self.conf) - - # initializing 2 connection pools: 1st for connections without - # confirmations, 2nd - with confirmations - self._connection_without_confirmation_pool = pika_pool.QueuedPool( - create=self.create_connection, - max_size=self.conf.oslo_messaging_pika.pool_max_size, - max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, - timeout=self.conf.oslo_messaging_pika.pool_timeout, - recycle=self.conf.oslo_messaging_pika.pool_recycle, - stale=self.conf.oslo_messaging_pika.pool_stale, - ) - - self._connection_with_confirmation_pool = pika_pool.QueuedPool( - create=self.create_connection, - max_size=self.conf.oslo_messaging_pika.pool_max_size, - max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, - timeout=self.conf.oslo_messaging_pika.pool_timeout, - recycle=self.conf.oslo_messaging_pika.pool_recycle, - stale=self.conf.oslo_messaging_pika.pool_stale, - ) - - self._connection_with_confirmation_pool.Connection = ( - _PooledConnectionWithConfirmations - ) - - self._pid = cur_pid - - def create_connection(self, for_listening=False): - self._init_if_needed() - return self._connection_factory.create_connection(for_listening) - - @property - def connection_without_confirmation_pool(self): - self._init_if_needed() - return self._connection_without_confirmation_pool - - @property - def connection_with_confirmation_pool(self): - self._init_if_needed() - return self._connection_with_confirmation_pool - - def cleanup(self): - if self._connection_factory: - self._connection_factory.cleanup() - - def declare_exchange_by_channel(self, channel, exchange, exchange_type, - durable): - """Declare exchange using already created channel, if they don't exist - - :param channel: Channel for communication with RabbitMQ - :param exchange: String, RabbitMQ exchange name - :param exchange_type: String ('direct', 'topic' or 'fanout') - exchange type for exchange to be declared - :param durable: Boolean, creates durable exchange if true - """ - try: - channel.exchange_declare( - exchange, exchange_type, auto_delete=True, durable=durable - ) - except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e: - raise pika_drv_exc.ConnectionException( - "Connectivity problem detected during declaring exchange: " - "exchange:{}, exchange_type: {}, durable: {}. {}".format( - exchange, exchange_type, durable, str(e) - ) - ) - - def declare_queue_binding_by_channel(self, channel, exchange, queue, - routing_key, exchange_type, - queue_expiration, durable): - """Declare exchange, queue and bind them using already created - channel, if they don't exist - - :param channel: Channel for communication with RabbitMQ - :param exchange: String, RabbitMQ exchange name - :param queue: Sting, RabbitMQ queue name - :param routing_key: Sting, RabbitMQ routing key for queue binding - :param exchange_type: String ('direct', 'topic' or 'fanout') - exchange type for exchange to be declared - :param queue_expiration: Integer, time in seconds which queue will - remain existing in RabbitMQ when there no consumers connected - :param durable: Boolean, creates durable exchange and queue if true - """ - try: - channel.exchange_declare( - exchange, exchange_type, auto_delete=True, durable=durable - ) - arguments = {} - - if queue_expiration > 0: - arguments['x-expires'] = queue_expiration * 1000 - - channel.queue_declare(queue, durable=durable, arguments=arguments) - - channel.queue_bind(queue, exchange, routing_key) - except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e: - raise pika_drv_exc.ConnectionException( - "Connectivity problem detected during declaring queue " - "binding: exchange:{}, queue: {}, routing_key: {}, " - "exchange_type: {}, queue_expiration: {}, " - "durable: {}. {}".format( - exchange, queue, routing_key, exchange_type, - queue_expiration, durable, str(e) - ) - ) - - def get_rpc_exchange_name(self, exchange): - """Returns RabbitMQ exchange name for given rpc request - - :param exchange: String, oslo.messaging target's exchange - - :return: String, RabbitMQ exchange name - """ - return exchange or self.default_rpc_exchange - - @staticmethod - def get_rpc_queue_name(topic, server, no_ack, worker=False): - """Returns RabbitMQ queue name for given rpc request - - :param topic: String, oslo.messaging target's topic - :param server: String, oslo.messaging target's server - :param no_ack: Boolean, use message delivery with acknowledges or not - :param worker: Boolean, use queue by single worker only or not - - :return: String, RabbitMQ queue name - """ - queue_parts = ["no_ack" if no_ack else "with_ack", topic] - if server is not None: - queue_parts.append(server) - if worker: - queue_parts.append("worker") - queue_parts.append(uuid.uuid4().hex) - queue = '.'.join(queue_parts) - return queue diff --git a/oslo_messaging/_drivers/pika_driver/pika_exceptions.py b/oslo_messaging/_drivers/pika_driver/pika_exceptions.py deleted file mode 100644 index c32d7e4..0000000 --- a/oslo_messaging/_drivers/pika_driver/pika_exceptions.py +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_messaging import exceptions - - -class ExchangeNotFoundException(exceptions.MessageDeliveryFailure): - """Is raised if specified exchange is not found in RabbitMQ.""" - pass - - -class MessageRejectedException(exceptions.MessageDeliveryFailure): - """Is raised if message which you are trying to send was nacked by RabbitMQ - it may happen if RabbitMQ is not able to process message - """ - pass - - -class RoutingException(exceptions.MessageDeliveryFailure): - """Is raised if message can not be delivered to any queue. Usually it means - that any queue is not binded to given exchange with given routing key. - Raised if 'mandatory' flag specified only - """ - pass - - -class ConnectionException(exceptions.MessagingException): - """Is raised if some operation can not be performed due to connectivity - problem - """ - pass - - -class TimeoutConnectionException(ConnectionException): - """Is raised if socket timeout was expired during network interaction""" - pass - - -class EstablishConnectionException(ConnectionException): - """Is raised if we have some problem during establishing connection - procedure - """ - pass - - -class HostConnectionNotAllowedException(EstablishConnectionException): - """Is raised in case of try to establish connection to temporary - not allowed host (because of reconnection policy for example) - """ - pass - - -class UnsupportedDriverVersion(exceptions.MessagingException): - """Is raised when message is received but was sent by different, - not supported driver version - """ - pass diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py deleted file mode 100644 index 1942fcc..0000000 --- a/oslo_messaging/_drivers/pika_driver/pika_listener.py +++ /dev/null @@ -1,123 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import threading -import uuid - -from concurrent import futures -from oslo_log import log as logging - -from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller - -LOG = logging.getLogger(__name__) - - -class RpcReplyPikaListener(object): - """Provide functionality for listening RPC replies. Create and handle - reply poller and coroutine for performing polling job - """ - - def __init__(self, pika_engine): - super(RpcReplyPikaListener, self).__init__() - self._pika_engine = pika_engine - - # preparing poller for listening replies - self._reply_queue = None - - self._reply_poller = None - self._reply_waiting_futures = {} - - self._reply_consumer_initialized = False - self._reply_consumer_initialization_lock = threading.Lock() - self._shutdown = False - - def get_reply_qname(self): - """As result return reply queue name, shared for whole process, - but before this check is RPC listener initialized or not and perform - initialization if needed - - :return: String, queue name which hould be used for reply sending - """ - if self._reply_consumer_initialized: - return self._reply_queue - - with self._reply_consumer_initialization_lock: - if self._reply_consumer_initialized: - return self._reply_queue - - # generate reply queue name if needed - if self._reply_queue is None: - self._reply_queue = "reply.{}.{}.{}".format( - self._pika_engine.conf.project, - self._pika_engine.conf.prog, uuid.uuid4().hex - ) - - # initialize reply poller if needed - if self._reply_poller is None: - self._reply_poller = pika_drv_poller.RpcReplyPikaPoller( - self._pika_engine, self._pika_engine.rpc_reply_exchange, - self._reply_queue, 1, None, - self._pika_engine.rpc_reply_listener_prefetch_count - ) - - self._reply_poller.start(self._on_incoming) - self._reply_consumer_initialized = True - - return self._reply_queue - - def _on_incoming(self, incoming): - """Reply polling job. Poll replies in infinite loop and notify - registered features - """ - for message in incoming: - try: - message.acknowledge() - future = self._reply_waiting_futures.pop( - message.msg_id, None - ) - if future is not None: - future.set_result(message) - except Exception: - LOG.exception("Unexpected exception during processing" - "reply message") - - def register_reply_waiter(self, msg_id): - """Register reply waiter. Should be called before message sending to - the server - :param msg_id: String, message_id of expected reply - :return future: Future, container for expected reply to be returned - over - """ - future = futures.Future() - self._reply_waiting_futures[msg_id] = future - return future - - def unregister_reply_waiter(self, msg_id): - """Unregister reply waiter. Should be called if client has not got - reply and doesn't want to continue waiting (if timeout_expired for - example) - :param msg_id: - """ - self._reply_waiting_futures.pop(msg_id, None) - - def cleanup(self): - """Stop replies consuming and cleanup resources""" - self._shutdown = True - - if self._reply_poller: - self._reply_poller.stop() - self._reply_poller.cleanup() - self._reply_poller = None - - self._reply_queue = None diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py deleted file mode 100644 index 2f79f05..0000000 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ /dev/null @@ -1,618 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import socket -import time -import traceback -import uuid - -from concurrent import futures -from oslo_log import log as logging -from oslo_utils import importutils -from oslo_utils import timeutils -from pika import exceptions as pika_exceptions -from pika import spec as pika_spec -import pika_pool -import six -import tenacity - - -import oslo_messaging -from oslo_messaging._drivers import base -from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns -from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc -from oslo_messaging import _utils as utils -from oslo_messaging import exceptions - - -LOG = logging.getLogger(__name__) - -_VERSION_HEADER = "version" -_VERSION = "1.0" - - -class RemoteExceptionMixin(object): - """Used for constructing dynamic exception type during deserialization of - remote exception. It defines unified '__init__' method signature and - exception message format - """ - def __init__(self, module, clazz, message, trace): - """Store serialized data - :param module: String, module name for importing original exception - class of serialized remote exception - :param clazz: String, original class name of serialized remote - exception - :param message: String, original message of serialized remote - exception - :param trace: String, original trace of serialized remote exception - """ - self.module = module - self.clazz = clazz - self.message = message - self.trace = trace - - self._str_msgs = message + "\n" + "\n".join(trace) - - def __str__(self): - return self._str_msgs - - -class PikaIncomingMessage(base.IncomingMessage): - """Driver friendly adapter for received message. Extract message - information from RabbitMQ message and provide access to it - """ - - def __init__(self, pika_engine, channel, method, properties, body): - """Parse RabbitMQ message - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param channel: Channel, RabbitMQ channel which was used for - this message delivery, used for sending ack back. - If None - ack is not required - :param method: Method, RabbitMQ message method - :param properties: Properties, RabbitMQ message properties - :param body: Bytes, RabbitMQ message body - """ - headers = getattr(properties, "headers", {}) - version = headers.get(_VERSION_HEADER, None) - if not utils.version_is_compatible(version, _VERSION): - raise pika_drv_exc.UnsupportedDriverVersion( - "Message's version: {} is not compatible with driver version: " - "{}".format(version, _VERSION)) - - self._pika_engine = pika_engine - self._channel = channel - self._delivery_tag = method.delivery_tag - - self._version = version - - self._content_type = properties.content_type - self.unique_id = properties.message_id - - self.expiration_time = ( - None if properties.expiration is None else - time.time() + float(properties.expiration) / 1000 - ) - - try: - serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[self._content_type] - except KeyError: - raise NotImplementedError( - "Content-type['{}'] is not supported.".format( - self._content_type - ) - ) - - message_dict = serializer.load_from_bytes(body) - - context_dict = {} - - for key in list(message_dict.keys()): - key = six.text_type(key) - if key.startswith('_$_'): - value = message_dict.pop(key) - context_dict[key[3:]] = value - - super(PikaIncomingMessage, self).__init__(context_dict, message_dict) - - def need_ack(self): - return self._channel is not None - - def acknowledge(self): - """Ack the message. Should be called by message processing logic when - it considered as consumed (means that we don't need redelivery of this - message anymore) - """ - if self.need_ack(): - self._channel.basic_ack(delivery_tag=self._delivery_tag) - - def requeue(self): - """Rollback the message. Should be called by message processing logic - when it can not process the message right now and should be redelivered - later if it is possible - """ - if self.need_ack(): - return self._channel.basic_nack(delivery_tag=self._delivery_tag, - requeue=True) - - -class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage): - """PikaIncomingMessage implementation for RPC messages. It expects - extra RPC related fields in message body (msg_id and reply_q). Also 'reply' - method added to allow consumer to send RPC reply back to the RPC client - """ - - def __init__(self, pika_engine, channel, method, properties, body): - """Defines default values of msg_id and reply_q fields and just call - super.__init__ method - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param channel: Channel, RabbitMQ channel which was used for - this message delivery, used for sending ack back. - If None - ack is not required - :param method: Method, RabbitMQ message method - :param properties: Properties, RabbitMQ message properties - :param body: Bytes, RabbitMQ message body - """ - super(RpcPikaIncomingMessage, self).__init__( - pika_engine, channel, method, properties, body - ) - self.reply_q = properties.reply_to - self.msg_id = properties.correlation_id - - def reply(self, reply=None, failure=None): - """Send back reply to the RPC client - :param reply: Dictionary, reply. In case of exception should be None - :param failure: Tuple, should be a sys.exc_info() tuple. - Should be None if RPC request was successfully processed. - - :return RpcReplyPikaIncomingMessage: message with reply - """ - - if self.reply_q is None: - return - - reply_outgoing_message = RpcReplyPikaOutgoingMessage( - self._pika_engine, self.msg_id, reply=reply, failure_info=failure, - content_type=self._content_type, - ) - - def on_exception(ex): - if isinstance(ex, pika_drv_exc.ConnectionException): - LOG.warning( - "Connectivity related problem during reply sending. %s", - ex - ) - return True - else: - return False - - if self._pika_engine.rpc_reply_retry_attempts: - retrier = tenacity.retry( - stop=( - tenacity.stop_never - if self._pika_engine.rpc_reply_retry_attempts == -1 else - tenacity.stop_after_attempt( - self._pika_engine.rpc_reply_retry_attempts - ) - ), - retry=tenacity.retry_if_exception(on_exception), - wait=tenacity.wait_fixed( - self._pika_engine.rpc_reply_retry_delay - ) - ) - else: - retrier = None - - try: - timeout = (None if self.expiration_time is None else - max(self.expiration_time - time.time(), 0)) - with timeutils.StopWatch(duration=timeout) as stopwatch: - reply_outgoing_message.send( - reply_q=self.reply_q, - stopwatch=stopwatch, - retrier=retrier - ) - LOG.debug( - "Message [id:'%s'] replied to '%s'.", self.msg_id, self.reply_q - ) - except Exception: - LOG.exception( - "Message [id:'%s'] wasn't replied to : %s", self.msg_id, - self.reply_q - ) - - -class RpcReplyPikaIncomingMessage(PikaIncomingMessage): - """PikaIncomingMessage implementation for RPC reply messages. It expects - extra RPC reply related fields in message body (result and failure). - """ - def __init__(self, pika_engine, channel, method, properties, body): - """Defines default values of result and failure fields, call - super.__init__ method and then construct Exception object if failure is - not None - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param channel: Channel, RabbitMQ channel which was used for - this message delivery, used for sending ack back. - If None - ack is not required - :param method: Method, RabbitMQ message method - :param properties: Properties, RabbitMQ message properties - :param body: Bytes, RabbitMQ message body - """ - super(RpcReplyPikaIncomingMessage, self).__init__( - pika_engine, channel, method, properties, body - ) - - self.msg_id = properties.correlation_id - - self.result = self.message.get("s", None) - self.failure = self.message.get("e", None) - - if self.failure is not None: - trace = self.failure.get('t', []) - message = self.failure.get('s', "") - class_name = self.failure.get('c') - module_name = self.failure.get('m') - - res_exc = None - - if module_name in pika_engine.allowed_remote_exmods: - try: - module = importutils.import_module(module_name) - klass = getattr(module, class_name) - - ex_type = type( - klass.__name__, - (RemoteExceptionMixin, klass), - {} - ) - - res_exc = ex_type(module_name, class_name, message, trace) - except ImportError as e: - LOG.warning( - "Can not deserialize remote exception [module:%s, " - "class:%s]. %s", module_name, class_name, e - ) - - # if we have not processed failure yet, use RemoteError class - if res_exc is None: - res_exc = oslo_messaging.RemoteError( - class_name, message, trace - ) - self.failure = res_exc - - -class PikaOutgoingMessage(object): - """Driver friendly adapter for sending message. Construct RabbitMQ message - and send it - """ - - def __init__(self, pika_engine, message, context, content_type=None): - """Parse RabbitMQ message - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param message: Dictionary, user's message fields - :param context: Dictionary, request context's fields - :param content_type: String, content-type header, defines serialization - mechanism, if None default content-type from pika_engine is used - """ - - self._pika_engine = pika_engine - - self._content_type = ( - content_type if content_type is not None else - self._pika_engine.default_content_type - ) - - try: - self._serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[ - self._content_type - ] - except KeyError: - raise NotImplementedError( - "Content-type['{}'] is not supported.".format( - self._content_type - ) - ) - - self.message = message - self.context = context - - self.unique_id = uuid.uuid4().hex - - def _prepare_message_to_send(self): - """Combine user's message fields an system fields (_unique_id, - context's data etc) - """ - msg = self.message.copy() - - if self.context: - for key, value in self.context.items(): - key = six.text_type(key) - msg['_$_' + key] = value - - props = pika_spec.BasicProperties( - content_type=self._content_type, - headers={_VERSION_HEADER: _VERSION}, - message_id=self.unique_id, - ) - return msg, props - - @staticmethod - def _publish(pool, exchange, routing_key, body, properties, mandatory, - stopwatch): - """Execute pika publish method using connection from connection pool - Also this message catches all pika related exceptions and raise - oslo.messaging specific exceptions - - :param pool: Pool, pika connection pool for connection choosing - :param exchange: String, RabbitMQ exchange name for message sending - :param routing_key: String, RabbitMQ routing key for message routing - :param body: Bytes, RabbitMQ message payload - :param properties: Properties, RabbitMQ message properties - :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise - exception if it is not possible to deliver message to any queue) - :param stopwatch: StopWatch, stopwatch object for calculating - allowed timeouts - """ - if stopwatch.expired(): - raise exceptions.MessagingTimeout( - "Timeout for current operation was expired." - ) - try: - timeout = stopwatch.leftover(return_none=True) - with pool.acquire(timeout=timeout) as conn: - if timeout is not None: - properties.expiration = str(int(timeout * 1000)) - conn.channel.publish( - exchange=exchange, - routing_key=routing_key, - body=body, - properties=properties, - mandatory=mandatory - ) - except pika_exceptions.NackError as e: - raise pika_drv_exc.MessageRejectedException( - "Can not send message: [body: {}], properties: {}] to " - "target [exchange: {}, routing_key: {}]. {}".format( - body, properties, exchange, routing_key, str(e) - ) - ) - except pika_exceptions.UnroutableError as e: - raise pika_drv_exc.RoutingException( - "Can not deliver message:[body:{}, properties: {}] to any " - "queue using target: [exchange:{}, " - "routing_key:{}]. {}".format( - body, properties, exchange, routing_key, str(e) - ) - ) - except pika_pool.Timeout as e: - raise exceptions.MessagingTimeout( - "Timeout for current operation was expired. {}".format(str(e)) - ) - except pika_pool.Connection.connectivity_errors as e: - if (isinstance(e, pika_exceptions.ChannelClosed) - and e.args and e.args[0] == 404): - raise pika_drv_exc.ExchangeNotFoundException( - "Attempt to send message to not existing exchange " - "detected, message: [body:{}, properties: {}], target: " - "[exchange:{}, routing_key:{}]. {}".format( - body, properties, exchange, routing_key, str(e) - ) - ) - - raise pika_drv_exc.ConnectionException( - "Connectivity problem detected during sending the message: " - "[body:{}, properties: {}] to target: [exchange:{}, " - "routing_key:{}]. {}".format( - body, properties, exchange, routing_key, str(e) - ) - ) - except socket.timeout: - raise pika_drv_exc.TimeoutConnectionException( - "Socket timeout exceeded." - ) - - def _do_send(self, exchange, routing_key, msg_dict, msg_props, - confirm=True, mandatory=True, persistent=False, - stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None): - """Send prepared message with configured retrying - - :param exchange: String, RabbitMQ exchange name for message sending - :param routing_key: String, RabbitMQ routing key for message routing - :param msg_dict: Dictionary, message payload - :param msg_props: Properties, message properties - :param confirm: Boolean, enable publisher confirmation if True - :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise - exception if it is not possible to deliver message to any queue) - :param persistent: Boolean, send persistent message if True, works only - for routing into durable queues - :param stopwatch: StopWatch, stopwatch object for calculating - allowed timeouts - :param retrier: tenacity.Retrying, configured retrier object for - sending message, if None no retrying is performed - """ - msg_props.delivery_mode = 2 if persistent else 1 - - pool = (self._pika_engine.connection_with_confirmation_pool - if confirm else - self._pika_engine.connection_without_confirmation_pool) - - body = self._serializer.dump_as_bytes(msg_dict) - - LOG.debug( - "Sending message:[body:%s; properties: %s] to target: " - "[exchange:%s; routing_key:%s]", body, msg_props, exchange, - routing_key - ) - - publish = (self._publish if retrier is None else - retrier(self._publish)) - - return publish(pool, exchange, routing_key, body, msg_props, - mandatory, stopwatch) - - def send(self, exchange, routing_key='', confirm=True, mandatory=True, - persistent=False, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, - retrier=None): - """Send message with configured retrying - - :param exchange: String, RabbitMQ exchange name for message sending - :param routing_key: String, RabbitMQ routing key for message routing - :param confirm: Boolean, enable publisher confirmation if True - :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise - exception if it is not possible to deliver message to any queue) - :param persistent: Boolean, send persistent message if True, works only - for routing into durable queues - :param stopwatch: StopWatch, stopwatch object for calculating - allowed timeouts - :param retrier: tenacity.Retrying, configured retrier object for - sending message, if None no retrying is performed - """ - msg_dict, msg_props = self._prepare_message_to_send() - - return self._do_send(exchange, routing_key, msg_dict, msg_props, - confirm, mandatory, persistent, - stopwatch, retrier) - - -class RpcPikaOutgoingMessage(PikaOutgoingMessage): - """PikaOutgoingMessage implementation for RPC messages. It adds - possibility to wait and receive RPC reply - """ - def __init__(self, pika_engine, message, context, content_type=None): - super(RpcPikaOutgoingMessage, self).__init__( - pika_engine, message, context, content_type - ) - self.msg_id = None - self.reply_q = None - - def send(self, exchange, routing_key, reply_listener=None, - stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None): - """Send RPC message with configured retrying - - :param exchange: String, RabbitMQ exchange name for message sending - :param routing_key: String, RabbitMQ routing key for message routing - :param reply_listener: RpcReplyPikaListener, listener for waiting - reply. If None - return immediately without reply waiting - :param stopwatch: StopWatch, stopwatch object for calculating - allowed timeouts - :param retrier: tenacity.Retrying, configured retrier object for - sending message, if None no retrying is performed - """ - msg_dict, msg_props = self._prepare_message_to_send() - - if reply_listener: - self.msg_id = uuid.uuid4().hex - msg_props.correlation_id = self.msg_id - LOG.debug('MSG_ID is %s', self.msg_id) - - self.reply_q = reply_listener.get_reply_qname() - msg_props.reply_to = self.reply_q - - future = reply_listener.register_reply_waiter(msg_id=self.msg_id) - - self._do_send( - exchange=exchange, routing_key=routing_key, msg_dict=msg_dict, - msg_props=msg_props, confirm=True, mandatory=True, - persistent=False, stopwatch=stopwatch, retrier=retrier - ) - - try: - return future.result(stopwatch.leftover(return_none=True)) - except BaseException as e: - reply_listener.unregister_reply_waiter(self.msg_id) - if isinstance(e, futures.TimeoutError): - e = exceptions.MessagingTimeout() - raise e - else: - self._do_send( - exchange=exchange, routing_key=routing_key, msg_dict=msg_dict, - msg_props=msg_props, confirm=True, mandatory=True, - persistent=False, stopwatch=stopwatch, retrier=retrier - ) - - -class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): - """PikaOutgoingMessage implementation for RPC reply messages. It sets - correlation_id AMQP property to link this reply with response - """ - def __init__(self, pika_engine, msg_id, reply=None, failure_info=None, - content_type=None): - """Initialize with reply information for sending - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param msg_id: String, msg_id of RPC request, which waits for reply - :param reply: Dictionary, reply. In case of exception should be None - :param failure_info: Tuple, should be a sys.exc_info() tuple. - Should be None if RPC request was successfully processed. - :param content_type: String, content-type header, defines serialization - mechanism, if None default content-type from pika_engine is used - """ - self.msg_id = msg_id - - if failure_info is not None: - ex_class = failure_info[0] - ex = failure_info[1] - tb = traceback.format_exception(*failure_info) - if issubclass(ex_class, RemoteExceptionMixin): - failure_data = { - 'c': ex.clazz, - 'm': ex.module, - 's': ex.message, - 't': tb - } - else: - failure_data = { - 'c': six.text_type(ex_class.__name__), - 'm': six.text_type(ex_class.__module__), - 's': six.text_type(ex), - 't': tb - } - - msg = {'e': failure_data} - else: - msg = {'s': reply} - - super(RpcReplyPikaOutgoingMessage, self).__init__( - pika_engine, msg, None, content_type - ) - - def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, - retrier=None): - """Send RPC message with configured retrying - - :param reply_q: String, queue name for sending reply - :param stopwatch: StopWatch, stopwatch object for calculating - allowed timeouts - :param retrier: tenacity.Retrying, configured retrier object for - sending message, if None no retrying is performed - """ - - msg_dict, msg_props = self._prepare_message_to_send() - msg_props.correlation_id = self.msg_id - - self._do_send( - exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q, - msg_dict=msg_dict, msg_props=msg_props, confirm=True, - mandatory=True, persistent=False, stopwatch=stopwatch, - retrier=retrier - ) diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py deleted file mode 100644 index 1c770a3..0000000 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ /dev/null @@ -1,538 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import threading - -from oslo_log import log as logging -from oslo_service import loopingcall - -from oslo_messaging._drivers import base -from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns -from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc -from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg - -LOG = logging.getLogger(__name__) - - -class PikaPoller(base.Listener): - """Provides user friendly functionality for RabbitMQ message consuming, - handles low level connectivity problems and restore connection if some - connectivity related problem detected - """ - - def __init__(self, pika_engine, batch_size, batch_timeout, prefetch_count, - incoming_message_class): - """Initialize required fields - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param batch_size: desired number of messages passed to - single on_incoming_callback call - :param batch_timeout: defines how long should we wait for batch_size - messages if we already have some messages waiting for processing - :param prefetch_count: Integer, maximum count of unacknowledged - messages which RabbitMQ broker sends to this consumer - :param incoming_message_class: PikaIncomingMessage, wrapper for - consumed RabbitMQ message - """ - super(PikaPoller, self).__init__(batch_size, batch_timeout, - prefetch_count) - self._pika_engine = pika_engine - self._incoming_message_class = incoming_message_class - - self._connection = None - self._channel = None - self._recover_loopingcall = None - self._lock = threading.RLock() - - self._cur_batch_buffer = None - self._cur_batch_timeout_id = None - - self._started = False - self._closing_connection_by_poller = False - - self._queues_to_consume = None - - def _on_connection_close(self, connection, reply_code, reply_text): - self._deliver_cur_batch() - if self._closing_connection_by_poller: - return - with self._lock: - self._connection = None - self._start_recover_consuming_task() - - def _on_channel_close(self, channel, reply_code, reply_text): - if self._cur_batch_buffer: - self._cur_batch_buffer = [ - message for message in self._cur_batch_buffer - if not message.need_ack() - ] - if self._closing_connection_by_poller: - return - with self._lock: - self._channel = None - self._start_recover_consuming_task() - - def _on_consumer_cancel(self, method_frame): - with self._lock: - if self._queues_to_consume: - consumer_tag = method_frame.method.consumer_tag - for queue_info in self._queues_to_consume: - if queue_info["consumer_tag"] == consumer_tag: - queue_info["consumer_tag"] = None - - self._start_recover_consuming_task() - - def _on_message_no_ack_callback(self, unused, method, properties, body): - """Is called by Pika when message was received from queue listened with - no_ack=True mode - """ - incoming_message = self._incoming_message_class( - self._pika_engine, None, method, properties, body - ) - self._on_incoming_message(incoming_message) - - def _on_message_with_ack_callback(self, unused, method, properties, body): - """Is called by Pika when message was received from queue listened with - no_ack=False mode - """ - incoming_message = self._incoming_message_class( - self._pika_engine, self._channel, method, properties, body - ) - self._on_incoming_message(incoming_message) - - def _deliver_cur_batch(self): - if self._cur_batch_timeout_id is not None: - self._connection.remove_timeout(self._cur_batch_timeout_id) - self._cur_batch_timeout_id = None - if self._cur_batch_buffer: - buf_to_send = self._cur_batch_buffer - self._cur_batch_buffer = None - try: - self.on_incoming_callback(buf_to_send) - except Exception: - LOG.exception("Unexpected exception during incoming delivery") - - def _on_incoming_message(self, incoming_message): - if self._cur_batch_buffer is None: - self._cur_batch_buffer = [incoming_message] - else: - self._cur_batch_buffer.append(incoming_message) - - if len(self._cur_batch_buffer) >= self.batch_size: - self._deliver_cur_batch() - return - - if self._cur_batch_timeout_id is None: - self._cur_batch_timeout_id = self._connection.add_timeout( - self.batch_timeout, self._deliver_cur_batch) - - def _start_recover_consuming_task(self): - """Start async job for checking connection to the broker.""" - if self._recover_loopingcall is None and self._started: - self._recover_loopingcall = ( - loopingcall.DynamicLoopingCall( - self._try_recover_consuming - ) - ) - LOG.info("Starting recover consuming job for listener: %s", self) - self._recover_loopingcall.start() - - def _try_recover_consuming(self): - with self._lock: - try: - if self._started: - self._start_or_recover_consuming() - except pika_drv_exc.EstablishConnectionException as e: - LOG.warning( - "Problem during establishing connection for pika " - "poller %s", e, exc_info=True - ) - return self._pika_engine.host_connection_reconnect_delay - except pika_drv_exc.ConnectionException as e: - LOG.warning( - "Connectivity exception during starting/recovering pika " - "poller %s", e, exc_info=True - ) - except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e: - LOG.warning( - "Connectivity exception during starting/recovering pika " - "poller %s", e, exc_info=True - ) - except BaseException: - # NOTE (dukhlov): I preffer to use here BaseException because - # if this method raise such exception LoopingCall stops - # execution Probably it should never happen and Exception - # should be enough but in case of programmer mistake it could - # be and it is potentially hard to catch problem if we will - # stop background task. It is better when it continue to work - # and write a lot of LOG with this error - LOG.exception("Unexpected exception during " - "starting/recovering pika poller") - else: - self._recover_loopingcall = None - LOG.info("Recover consuming job was finished for listener: %s", - self) - raise loopingcall.LoopingCallDone(True) - return 0 - - def _start_or_recover_consuming(self): - """Performs reconnection to the broker. It is unsafe method for - internal use only - """ - if self._connection is None or not self._connection.is_open: - self._connection = self._pika_engine.create_connection( - for_listening=True - ) - self._connection.add_on_close_callback(self._on_connection_close) - self._channel = None - - if self._channel is None or not self._channel.is_open: - if self._queues_to_consume: - for queue_info in self._queues_to_consume: - queue_info["consumer_tag"] = None - - self._channel = self._connection.channel() - self._channel.add_on_close_callback(self._on_channel_close) - self._channel.add_on_cancel_callback(self._on_consumer_cancel) - self._channel.basic_qos(prefetch_count=self.prefetch_size) - - if self._queues_to_consume is None: - self._queues_to_consume = self._declare_queue_binding() - - self._start_consuming() - - def _declare_queue_binding(self): - """Is called by recovering connection logic if target RabbitMQ - exchange and (or) queue do not exist. Should be overridden in child - classes - - :return Dictionary: declared_queue_name -> no_ack_mode - """ - raise NotImplementedError( - "It is base class. Please declare exchanges and queues here" - ) - - def _start_consuming(self): - """Is called by recovering connection logic for starting consumption - of configured RabbitMQ queues - """ - - assert self._queues_to_consume is not None - - try: - for queue_info in self._queues_to_consume: - if queue_info["consumer_tag"] is not None: - continue - no_ack = queue_info["no_ack"] - - on_message_callback = ( - self._on_message_no_ack_callback if no_ack - else self._on_message_with_ack_callback - ) - - queue_info["consumer_tag"] = self._channel.basic_consume( - on_message_callback, queue_info["queue_name"], - no_ack=no_ack - ) - except Exception: - self._queues_to_consume = None - raise - - def _stop_consuming(self): - """Is called by poller's stop logic for stopping consumption - of configured RabbitMQ queues - """ - - assert self._queues_to_consume is not None - - for queue_info in self._queues_to_consume: - consumer_tag = queue_info["consumer_tag"] - if consumer_tag is not None: - self._channel.basic_cancel(consumer_tag) - queue_info["consumer_tag"] = None - - def start(self, on_incoming_callback): - """Starts poller. Should be called before polling to allow message - consuming - - :param on_incoming_callback: callback function to be executed when - listener received messages. Messages should be processed and - acked/nacked by callback - """ - super(PikaPoller, self).start(on_incoming_callback) - - with self._lock: - if self._started: - return - connected = False - try: - self._start_or_recover_consuming() - except pika_drv_exc.EstablishConnectionException as exc: - LOG.warning( - "Can not establish connection during pika poller's " - "start(). %s", exc, exc_info=True - ) - except pika_drv_exc.ConnectionException as exc: - LOG.warning( - "Connectivity problem during pika poller's start(). %s", - exc, exc_info=True - ) - except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc: - LOG.warning( - "Connectivity problem during pika poller's start(). %s", - exc, exc_info=True - ) - else: - connected = True - - self._started = True - if not connected: - self._start_recover_consuming_task() - - def stop(self): - """Stops poller. Should be called when polling is not needed anymore to - stop new message consuming. After that it is necessary to poll already - prefetched messages - """ - super(PikaPoller, self).stop() - - with self._lock: - if not self._started: - return - - if self._recover_loopingcall is not None: - self._recover_loopingcall.stop() - self._recover_loopingcall = None - - if (self._queues_to_consume and self._channel and - self._channel.is_open): - try: - self._stop_consuming() - except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc: - LOG.warning( - "Connectivity problem detected during consumer " - "cancellation. %s", exc, exc_info=True - ) - self._deliver_cur_batch() - self._started = False - - def cleanup(self): - """Cleanup allocated resources (channel, connection, etc).""" - with self._lock: - if self._connection and self._connection.is_open: - try: - self._closing_connection_by_poller = True - self._connection.close() - self._closing_connection_by_poller = False - except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS: - # expected errors - pass - except Exception: - LOG.exception("Unexpected error during closing connection") - finally: - self._channel = None - self._connection = None - - -class RpcServicePikaPoller(PikaPoller): - """PikaPoller implementation for polling RPC messages. Overrides base - functionality according to RPC specific - """ - def __init__(self, pika_engine, target, batch_size, batch_timeout, - prefetch_count): - """Adds target parameter for declaring RPC specific exchanges and - queues - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param target: Target, oslo.messaging Target object which defines RPC - endpoint - :param batch_size: desired number of messages passed to - single on_incoming_callback call - :param batch_timeout: defines how long should we wait for batch_size - messages if we already have some messages waiting for processing - :param prefetch_count: Integer, maximum count of unacknowledged - messages which RabbitMQ broker sends to this consumer - """ - self._target = target - - super(RpcServicePikaPoller, self).__init__( - pika_engine, batch_size, batch_timeout, prefetch_count, - pika_drv_msg.RpcPikaIncomingMessage - ) - - def _declare_queue_binding(self): - """Overrides base method and perform declaration of RabbitMQ exchanges - and queues which correspond to oslo.messaging RPC target - - :return Dictionary: declared_queue_name -> no_ack_mode - """ - queue_expiration = self._pika_engine.rpc_queue_expiration - - exchange = self._pika_engine.get_rpc_exchange_name( - self._target.exchange - ) - - queues_to_consume = [] - - for no_ack in [True, False]: - queue = self._pika_engine.get_rpc_queue_name( - self._target.topic, None, no_ack - ) - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, exchange=exchange, queue=queue, - routing_key=queue, exchange_type='direct', durable=False, - queue_expiration=queue_expiration - ) - queues_to_consume.append( - {"queue_name": queue, "no_ack": no_ack, "consumer_tag": None} - ) - - if self._target.server: - server_queue = self._pika_engine.get_rpc_queue_name( - self._target.topic, self._target.server, no_ack - ) - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, exchange=exchange, durable=False, - queue=server_queue, routing_key=server_queue, - exchange_type='direct', queue_expiration=queue_expiration - ) - queues_to_consume.append( - {"queue_name": server_queue, "no_ack": no_ack, - "consumer_tag": None} - ) - - worker_queue = self._pika_engine.get_rpc_queue_name( - self._target.topic, self._target.server, no_ack, True - ) - all_workers_routing_key = self._pika_engine.get_rpc_queue_name( - self._target.topic, "all_workers", no_ack - ) - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, exchange=exchange, durable=False, - queue=worker_queue, routing_key=all_workers_routing_key, - exchange_type='direct', queue_expiration=queue_expiration - ) - queues_to_consume.append( - {"queue_name": worker_queue, "no_ack": no_ack, - "consumer_tag": None} - ) - - return queues_to_consume - - -class RpcReplyPikaPoller(PikaPoller): - """PikaPoller implementation for polling RPC reply messages. Overrides - base functionality according to RPC reply specific - """ - def __init__(self, pika_engine, exchange, queue, batch_size, batch_timeout, - prefetch_count): - """Adds exchange and queue parameter for declaring exchange and queue - used for RPC reply delivery - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param exchange: String, exchange name used for RPC reply delivery - :param queue: String, queue name used for RPC reply delivery - :param batch_size: desired number of messages passed to - single on_incoming_callback call - :param batch_timeout: defines how long should we wait for batch_size - messages if we already have some messages waiting for processing - :param prefetch_count: Integer, maximum count of unacknowledged - messages which RabbitMQ broker sends to this consumer - """ - self._exchange = exchange - self._queue = queue - - super(RpcReplyPikaPoller, self).__init__( - pika_engine, batch_size, batch_timeout, prefetch_count, - pika_drv_msg.RpcReplyPikaIncomingMessage - ) - - def _declare_queue_binding(self): - """Overrides base method and perform declaration of RabbitMQ exchange - and queue used for RPC reply delivery - - :return Dictionary: declared_queue_name -> no_ack_mode - """ - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, - exchange=self._exchange, queue=self._queue, - routing_key=self._queue, exchange_type='direct', - queue_expiration=self._pika_engine.rpc_queue_expiration, - durable=False - ) - - return [{"queue_name": self._queue, "no_ack": False, - "consumer_tag": None}] - - -class NotificationPikaPoller(PikaPoller): - """PikaPoller implementation for polling Notification messages. Overrides - base functionality according to Notification specific - """ - def __init__(self, pika_engine, targets_and_priorities, - batch_size, batch_timeout, prefetch_count, queue_name=None): - """Adds targets_and_priorities and queue_name parameter - for declaring exchanges and queues used for notification delivery - - :param pika_engine: PikaEngine, shared object with configuration and - shared driver functionality - :param targets_and_priorities: list of (target, priority), defines - default queue names for corresponding notification types - :param batch_size: desired number of messages passed to - single on_incoming_callback call - :param batch_timeout: defines how long should we wait for batch_size - messages if we already have some messages waiting for processing - :param prefetch_count: Integer, maximum count of unacknowledged - messages which RabbitMQ broker sends to this consumer - :param queue: String, alternative queue name used for this poller - instead of default queue name - """ - self._targets_and_priorities = targets_and_priorities - self._queue_name = queue_name - - super(NotificationPikaPoller, self).__init__( - pika_engine, batch_size, batch_timeout, prefetch_count, - pika_drv_msg.PikaIncomingMessage - ) - - def _declare_queue_binding(self): - """Overrides base method and perform declaration of RabbitMQ exchanges - and queues used for notification delivery - - :return Dictionary: declared_queue_name -> no_ack_mode - """ - queues_to_consume = [] - for target, priority in self._targets_and_priorities: - routing_key = '%s.%s' % (target.topic, priority) - queue = self._queue_name or routing_key - self._pika_engine.declare_queue_binding_by_channel( - channel=self._channel, - exchange=( - target.exchange or - self._pika_engine.default_notification_exchange - ), - queue=queue, - routing_key=routing_key, - exchange_type='direct', - queue_expiration=None, - durable=self._pika_engine.notification_persistence, - ) - queues_to_consume.append( - {"queue_name": queue, "no_ack": False, "consumer_tag": None} - ) - - return queues_to_consume diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index b766bb3..3181ae4 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -23,11 +23,9 @@ import itertools from oslo_messaging._drivers import amqp from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts from oslo_messaging._drivers import base as drivers_base -from oslo_messaging._drivers import impl_pika from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers.impl_zmq import zmq_options from oslo_messaging._drivers.kafka_driver import kafka_options -from oslo_messaging._drivers.pika_driver import pika_connection_factory from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis from oslo_messaging.notify import notifier from oslo_messaging.rpc import client @@ -50,10 +48,7 @@ _opts = [ ('oslo_messaging_amqp', amqp_opts.amqp1_opts), ('oslo_messaging_notifications', notifier._notifier_opts), ('oslo_messaging_rabbit', list( - itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts, - pika_connection_factory.pika_opts, - impl_pika.pika_pool_opts, impl_pika.message_opts, - impl_pika.notification_opts, impl_pika.rpc_opts))), + itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts))), ('oslo_messaging_kafka', kafka_options.KAFKA_OPTS), ] diff --git a/oslo_messaging/tests/drivers/pika/__init__.py b/oslo_messaging/tests/drivers/pika/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/oslo_messaging/tests/drivers/pika/__init__.py +++ /dev/null diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py deleted file mode 100644 index 3722f87..0000000 --- a/oslo_messaging/tests/drivers/pika/test_message.py +++ /dev/null @@ -1,615 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import functools -import unittest - -from concurrent import futures -from oslo_serialization import jsonutils -from oslo_utils import timeutils -import pika -from six.moves import mock - -import oslo_messaging -from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns -from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg - - -class PikaIncomingMessageTestCase(unittest.TestCase): - def setUp(self): - self._pika_engine = mock.Mock() - self._channel = mock.Mock() - - self._delivery_tag = 12345 - - self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) - self._properties = pika.BasicProperties( - content_type="application/json", - headers={"version": "1.0"}, - ) - self._body = ( - b'{"_$_key_context":"context_value",' - b'"payload_key": "payload_value"}' - ) - - def test_message_body_parsing(self): - message = pika_drv_msg.PikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body - ) - - self.assertEqual("context_value", - message.ctxt.get("key_context", None)) - self.assertEqual("payload_value", - message.message.get("payload_key", None)) - - def test_message_acknowledge(self): - message = pika_drv_msg.PikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body - ) - - message.acknowledge() - - self.assertEqual(1, self._channel.basic_ack.call_count) - self.assertEqual({"delivery_tag": self._delivery_tag}, - self._channel.basic_ack.call_args[1]) - - def test_message_acknowledge_no_ack(self): - message = pika_drv_msg.PikaIncomingMessage( - self._pika_engine, None, self._method, self._properties, - self._body - ) - - message.acknowledge() - - self.assertEqual(0, self._channel.basic_ack.call_count) - - def test_message_requeue(self): - message = pika_drv_msg.PikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body - ) - - message.requeue() - - self.assertEqual(1, self._channel.basic_nack.call_count) - self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True}, - self._channel.basic_nack.call_args[1]) - - def test_message_requeue_no_ack(self): - message = pika_drv_msg.PikaIncomingMessage( - self._pika_engine, None, self._method, self._properties, - self._body - ) - - message.requeue() - - self.assertEqual(0, self._channel.basic_nack.call_count) - - -class RpcPikaIncomingMessageTestCase(unittest.TestCase): - def setUp(self): - self._pika_engine = mock.Mock() - self._pika_engine.rpc_reply_retry_attempts = 3 - self._pika_engine.rpc_reply_retry_delay = 0.25 - - self._channel = mock.Mock() - - self._delivery_tag = 12345 - - self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) - self._body = ( - b'{"_$_key_context":"context_value",' - b'"payload_key":"payload_value"}' - ) - self._properties = pika.BasicProperties( - content_type="application/json", - headers={"version": "1.0"}, - ) - - def test_call_message_body_parsing(self): - self._properties.correlation_id = 123456789 - self._properties.reply_to = "reply_queue" - - message = pika_drv_msg.RpcPikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body - ) - - self.assertEqual("context_value", - message.ctxt.get("key_context", None)) - self.assertEqual(123456789, message.msg_id) - self.assertEqual("reply_queue", message.reply_q) - - self.assertEqual("payload_value", - message.message.get("payload_key", None)) - - def test_cast_message_body_parsing(self): - message = pika_drv_msg.RpcPikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body - ) - - self.assertEqual("context_value", - message.ctxt.get("key_context", None)) - self.assertIsNone(message.msg_id) - self.assertIsNone(message.reply_q) - - self.assertEqual("payload_value", - message.message.get("payload_key", None)) - - @mock.patch(("oslo_messaging._drivers.pika_driver.pika_message." - "PikaOutgoingMessage.send")) - def test_reply_for_cast_message(self, send_reply_mock): - message = pika_drv_msg.RpcPikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body - ) - - self.assertEqual("context_value", - message.ctxt.get("key_context", None)) - self.assertIsNone(message.msg_id) - self.assertIsNone(message.reply_q) - - self.assertEqual("payload_value", - message.message.get("payload_key", None)) - - message.reply(reply=object()) - - self.assertEqual(0, send_reply_mock.call_count) - - @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." - "RpcReplyPikaOutgoingMessage") - @mock.patch("tenacity.retry") - def test_positive_reply_for_call_message(self, - retry_mock, - outgoing_message_mock): - self._properties.correlation_id = 123456789 - self._properties.reply_to = "reply_queue" - - message = pika_drv_msg.RpcPikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body - ) - - self.assertEqual("context_value", - message.ctxt.get("key_context", None)) - self.assertEqual(123456789, message.msg_id) - self.assertEqual("reply_queue", message.reply_q) - - self.assertEqual("payload_value", - message.message.get("payload_key", None)) - reply = "all_fine" - message.reply(reply=reply) - - outgoing_message_mock.assert_called_once_with( - self._pika_engine, 123456789, failure_info=None, reply='all_fine', - content_type='application/json' - ) - outgoing_message_mock().send.assert_called_once_with( - reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY - ) - retry_mock.assert_called_once_with( - stop=mock.ANY, retry=mock.ANY, wait=mock.ANY - ) - - @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." - "RpcReplyPikaOutgoingMessage") - @mock.patch("tenacity.retry") - def test_negative_reply_for_call_message(self, - retry_mock, - outgoing_message_mock): - self._properties.correlation_id = 123456789 - self._properties.reply_to = "reply_queue" - - message = pika_drv_msg.RpcPikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - self._body - ) - - self.assertEqual("context_value", - message.ctxt.get("key_context", None)) - self.assertEqual(123456789, message.msg_id) - self.assertEqual("reply_queue", message.reply_q) - - self.assertEqual("payload_value", - message.message.get("payload_key", None)) - - failure_info = object() - message.reply(failure=failure_info) - - outgoing_message_mock.assert_called_once_with( - self._pika_engine, 123456789, - failure_info=failure_info, - reply=None, - content_type='application/json' - ) - outgoing_message_mock().send.assert_called_once_with( - reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY - ) - retry_mock.assert_called_once_with( - stop=mock.ANY, retry=mock.ANY, wait=mock.ANY - ) - - -class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase): - def setUp(self): - self._pika_engine = mock.Mock() - self._pika_engine.allowed_remote_exmods = [ - pika_drv_cmns.EXCEPTIONS_MODULE, "oslo_messaging.exceptions" - ] - - self._channel = mock.Mock() - - self._delivery_tag = 12345 - - self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) - - self._properties = pika.BasicProperties( - content_type="application/json", - headers={"version": "1.0"}, - correlation_id=123456789 - ) - - def test_positive_reply_message_body_parsing(self): - - body = b'{"s": "all fine"}' - - message = pika_drv_msg.RpcReplyPikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - body - ) - - self.assertEqual(123456789, message.msg_id) - self.assertIsNone(message.failure) - self.assertEqual("all fine", message.result) - - def test_negative_reply_message_body_parsing(self): - - body = (b'{' - b' "e": {' - b' "s": "Error message",' - b' "t": ["TRACE HERE"],' - b' "c": "MessagingException",' - b' "m": "oslo_messaging.exceptions"' - b' }' - b'}') - - message = pika_drv_msg.RpcReplyPikaIncomingMessage( - self._pika_engine, self._channel, self._method, self._properties, - body - ) - - self.assertEqual(123456789, message.msg_id) - self.assertIsNone(message.result) - self.assertEqual( - 'Error message\n' - 'TRACE HERE', - str(message.failure) - ) - self.assertIsInstance(message.failure, - oslo_messaging.MessagingException) - - -class PikaOutgoingMessageTestCase(unittest.TestCase): - def setUp(self): - self._pika_engine = mock.MagicMock() - self._pika_engine.default_content_type = "application/json" - self._exchange = "it is exchange" - self._routing_key = "it is routing key" - self._expiration = 1 - self._stopwatch = ( - timeutils.StopWatch(duration=self._expiration).start() - ) - self._mandatory = object() - - self._message = {"msg_type": 1, "msg_str": "hello"} - self._context = {"request_id": 555, "token": "it is a token"} - - @mock.patch("oslo_serialization.jsonutils.dump_as_bytes", - new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) - def test_send_with_confirmation(self): - message = pika_drv_msg.PikaOutgoingMessage( - self._pika_engine, self._message, self._context - ) - - message.send( - exchange=self._exchange, - routing_key=self._routing_key, - confirm=True, - mandatory=self._mandatory, - persistent=True, - stopwatch=self._stopwatch, - retrier=None - ) - - self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.assert_called_once_with( - body=mock.ANY, - exchange=self._exchange, mandatory=self._mandatory, - properties=mock.ANY, - routing_key=self._routing_key - ) - - body = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["body"] - - self.assertEqual( - b'{"_$_request_id": 555, "_$_token": "it is a token", ' - b'"msg_str": "hello", "msg_type": 1}', - body - ) - - props = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["properties"] - - self.assertEqual('application/json', props.content_type) - self.assertEqual(2, props.delivery_mode) - self.assertTrue(self._expiration * 1000 - float(props.expiration) < - 100) - self.assertEqual({'version': '1.0'}, props.headers) - self.assertTrue(props.message_id) - - @mock.patch("oslo_serialization.jsonutils.dump_as_bytes", - new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) - def test_send_without_confirmation(self): - message = pika_drv_msg.PikaOutgoingMessage( - self._pika_engine, self._message, self._context - ) - - message.send( - exchange=self._exchange, - routing_key=self._routing_key, - confirm=False, - mandatory=self._mandatory, - persistent=False, - stopwatch=self._stopwatch, - retrier=None - ) - - self._pika_engine.connection_without_confirmation_pool.acquire( - ).__enter__().channel.publish.assert_called_once_with( - body=mock.ANY, - exchange=self._exchange, mandatory=self._mandatory, - properties=mock.ANY, - routing_key=self._routing_key - ) - - body = self._pika_engine.connection_without_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["body"] - - self.assertEqual( - b'{"_$_request_id": 555, "_$_token": "it is a token", ' - b'"msg_str": "hello", "msg_type": 1}', - body - ) - - props = self._pika_engine.connection_without_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["properties"] - - self.assertEqual('application/json', props.content_type) - self.assertEqual(1, props.delivery_mode) - self.assertTrue(self._expiration * 1000 - float(props.expiration) - < 100) - self.assertEqual({'version': '1.0'}, props.headers) - self.assertTrue(props.message_id) - - -class RpcPikaOutgoingMessageTestCase(unittest.TestCase): - def setUp(self): - self._exchange = "it is exchange" - self._routing_key = "it is routing key" - - self._pika_engine = mock.MagicMock() - self._pika_engine.get_rpc_exchange_name.return_value = self._exchange - self._pika_engine.get_rpc_queue_name.return_value = self._routing_key - self._pika_engine.default_content_type = "application/json" - - self._message = {"msg_type": 1, "msg_str": "hello"} - self._context = {"request_id": 555, "token": "it is a token"} - - @mock.patch("oslo_serialization.jsonutils.dump_as_bytes", - new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) - def test_send_cast_message(self): - message = pika_drv_msg.RpcPikaOutgoingMessage( - self._pika_engine, self._message, self._context - ) - - expiration = 1 - stopwatch = timeutils.StopWatch(duration=expiration).start() - - message.send( - exchange=self._exchange, - routing_key=self._routing_key, - reply_listener=None, - stopwatch=stopwatch, - retrier=None - ) - - self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.assert_called_once_with( - body=mock.ANY, - exchange=self._exchange, mandatory=True, - properties=mock.ANY, - routing_key=self._routing_key - ) - - body = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["body"] - - self.assertEqual( - b'{"_$_request_id": 555, "_$_token": "it is a token", ' - b'"msg_str": "hello", "msg_type": 1}', - body - ) - - props = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["properties"] - - self.assertEqual('application/json', props.content_type) - self.assertEqual(1, props.delivery_mode) - self.assertTrue(expiration * 1000 - float(props.expiration) < 100) - self.assertEqual({'version': '1.0'}, props.headers) - self.assertIsNone(props.correlation_id) - self.assertIsNone(props.reply_to) - self.assertTrue(props.message_id) - - @mock.patch("oslo_serialization.jsonutils.dump_as_bytes", - new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) - def test_send_call_message(self): - message = pika_drv_msg.RpcPikaOutgoingMessage( - self._pika_engine, self._message, self._context - ) - - expiration = 1 - stopwatch = timeutils.StopWatch(duration=expiration).start() - - result = "it is a result" - reply_queue_name = "reply_queue_name" - - future = futures.Future() - future.set_result(result) - reply_listener = mock.Mock() - reply_listener.register_reply_waiter.return_value = future - reply_listener.get_reply_qname.return_value = reply_queue_name - - res = message.send( - exchange=self._exchange, - routing_key=self._routing_key, - reply_listener=reply_listener, - stopwatch=stopwatch, - retrier=None - ) - - self.assertEqual(result, res) - - self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.assert_called_once_with( - body=mock.ANY, - exchange=self._exchange, mandatory=True, - properties=mock.ANY, - routing_key=self._routing_key - ) - - body = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["body"] - - self.assertEqual( - b'{"_$_request_id": 555, "_$_token": "it is a token", ' - b'"msg_str": "hello", "msg_type": 1}', - body - ) - - props = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["properties"] - - self.assertEqual('application/json', props.content_type) - self.assertEqual(1, props.delivery_mode) - self.assertTrue(expiration * 1000 - float(props.expiration) < 100) - self.assertEqual({'version': '1.0'}, props.headers) - self.assertEqual(message.msg_id, props.correlation_id) - self.assertEqual(reply_queue_name, props.reply_to) - self.assertTrue(props.message_id) - - -class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): - def setUp(self): - self._reply_q = "reply_queue_name" - - self._expiration = 1 - self._stopwatch = ( - timeutils.StopWatch(duration=self._expiration).start() - ) - - self._pika_engine = mock.MagicMock() - - self._rpc_reply_exchange = "rpc_reply_exchange" - self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange - self._pika_engine.default_content_type = "application/json" - - self._msg_id = 12345567 - - @mock.patch("oslo_serialization.jsonutils.dump_as_bytes", - new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) - def test_success_message_send(self): - message = pika_drv_msg.RpcReplyPikaOutgoingMessage( - self._pika_engine, self._msg_id, reply="all_fine" - ) - - message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None) - - self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.assert_called_once_with( - body=b'{"s": "all_fine"}', - exchange=self._rpc_reply_exchange, mandatory=True, - properties=mock.ANY, - routing_key=self._reply_q - ) - - props = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["properties"] - - self.assertEqual('application/json', props.content_type) - self.assertEqual(1, props.delivery_mode) - self.assertTrue(self._expiration * 1000 - float(props.expiration) < - 100) - self.assertEqual({'version': '1.0'}, props.headers) - self.assertEqual(message.msg_id, props.correlation_id) - self.assertIsNone(props.reply_to) - self.assertTrue(props.message_id) - - @mock.patch("traceback.format_exception", new=lambda x, y, z: z) - @mock.patch("oslo_serialization.jsonutils.dump_as_bytes", - new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True)) - def test_failure_message_send(self): - failure_info = (oslo_messaging.MessagingException, - oslo_messaging.MessagingException("Error message"), - ['It is a trace']) - - message = pika_drv_msg.RpcReplyPikaOutgoingMessage( - self._pika_engine, self._msg_id, failure_info=failure_info - ) - - message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None) - - self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.assert_called_once_with( - body=mock.ANY, - exchange=self._rpc_reply_exchange, - mandatory=True, - properties=mock.ANY, - routing_key=self._reply_q - ) - - body = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["body"] - self.assertEqual( - b'{"e": {"c": "MessagingException", ' - b'"m": "oslo_messaging.exceptions", "s": "Error message", ' - b'"t": ["It is a trace"]}}', - body - ) - - props = self._pika_engine.connection_with_confirmation_pool.acquire( - ).__enter__().channel.publish.call_args[1]["properties"] - - self.assertEqual('application/json', props.content_type) - self.assertEqual(1, props.delivery_mode) - self.assertTrue(self._expiration * 1000 - float(props.expiration) < - 100) - self.assertEqual({'version': '1.0'}, props.headers) - self.assertEqual(message.msg_id, props.correlation_id) - self.assertIsNone(props.reply_to) - self.assertTrue(props.message_id) diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py deleted file mode 100644 index 7957bec..0000000 --- a/oslo_messaging/tests/drivers/pika/test_poller.py +++ /dev/null @@ -1,482 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import threading -import time -import unittest - -from concurrent import futures -from six.moves import mock - -from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc -from oslo_messaging._drivers.pika_driver import pika_poller - - -class PikaPollerTestCase(unittest.TestCase): - def setUp(self): - self._pika_engine = mock.Mock() - self._poller_connection_mock = mock.Mock() - self._poller_channel_mock = mock.Mock() - self._poller_connection_mock.channel.return_value = ( - self._poller_channel_mock - ) - self._pika_engine.create_connection.return_value = ( - self._poller_connection_mock - ) - - self._executor = futures.ThreadPoolExecutor(1) - - def timer_task(timeout, callback): - time.sleep(timeout) - callback() - - self._poller_connection_mock.add_timeout.side_effect = ( - lambda *args: self._executor.submit(timer_task, *args) - ) - - self._prefetch_count = 123 - - @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." - "_declare_queue_binding") - def test_start(self, declare_queue_binding_mock): - poller = pika_poller.PikaPoller( - self._pika_engine, 1, None, self._prefetch_count, None - ) - - poller.start(None) - - self.assertTrue(self._pika_engine.create_connection.called) - self.assertTrue(self._poller_connection_mock.channel.called) - self.assertTrue(declare_queue_binding_mock.called) - - def test_start_when_connection_unavailable(self): - poller = pika_poller.PikaPoller( - self._pika_engine, 1, None, self._prefetch_count, None - ) - - self._pika_engine.create_connection.side_effect = ( - pika_drv_exc.EstablishConnectionException - ) - - # start() should not raise socket.timeout exception - poller.start(None) - - # stop is needed to stop reconnection background job - poller.stop() - - @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." - "_declare_queue_binding") - def test_message_processing(self, declare_queue_binding_mock): - res = [] - - def on_incoming_callback(incoming): - res.append(incoming) - - incoming_message_class_mock = mock.Mock() - poller = pika_poller.PikaPoller( - self._pika_engine, 1, None, self._prefetch_count, - incoming_message_class=incoming_message_class_mock - ) - unused = object() - method = object() - properties = object() - body = object() - - poller.start(on_incoming_callback) - poller._on_message_with_ack_callback( - unused, method, properties, body - ) - - self.assertEqual(1, len(res)) - - self.assertEqual([incoming_message_class_mock.return_value], res[0]) - incoming_message_class_mock.assert_called_once_with( - self._pika_engine, self._poller_channel_mock, method, properties, - body - ) - - self.assertTrue(self._pika_engine.create_connection.called) - self.assertTrue(self._poller_connection_mock.channel.called) - - self.assertTrue(declare_queue_binding_mock.called) - - @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." - "_declare_queue_binding") - def test_message_processing_batch(self, declare_queue_binding_mock): - incoming_message_class_mock = mock.Mock() - - n = 10 - params = [] - - res = [] - - def on_incoming_callback(incoming): - res.append(incoming) - - poller = pika_poller.PikaPoller( - self._pika_engine, n, None, self._prefetch_count, - incoming_message_class=incoming_message_class_mock - ) - - for i in range(n): - params.append((object(), object(), object(), object())) - - poller.start(on_incoming_callback) - - for i in range(n): - poller._on_message_with_ack_callback( - *params[i] - ) - - self.assertEqual(1, len(res)) - self.assertEqual(10, len(res[0])) - self.assertEqual(n, incoming_message_class_mock.call_count) - - for i in range(n): - self.assertEqual(incoming_message_class_mock.return_value, - res[0][i]) - self.assertEqual( - (self._pika_engine, self._poller_channel_mock) + params[i][1:], - incoming_message_class_mock.call_args_list[i][0] - ) - - self.assertTrue(self._pika_engine.create_connection.called) - self.assertTrue(self._poller_connection_mock.channel.called) - - self.assertTrue(declare_queue_binding_mock.called) - - @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." - "_declare_queue_binding") - def test_message_processing_batch_with_timeout(self, - declare_queue_binding_mock): - incoming_message_class_mock = mock.Mock() - - n = 10 - timeout = 1 - - res = [] - evt = threading.Event() - - def on_incoming_callback(incoming): - res.append(incoming) - evt.set() - - poller = pika_poller.PikaPoller( - self._pika_engine, n, timeout, self._prefetch_count, - incoming_message_class=incoming_message_class_mock - ) - - params = [] - - success_count = 5 - - poller.start(on_incoming_callback) - - for i in range(n): - params.append((object(), object(), object(), object())) - - for i in range(success_count): - poller._on_message_with_ack_callback( - *params[i] - ) - - self.assertTrue(evt.wait(timeout * 2)) - - self.assertEqual(1, len(res)) - self.assertEqual(success_count, len(res[0])) - self.assertEqual(success_count, incoming_message_class_mock.call_count) - - for i in range(success_count): - self.assertEqual(incoming_message_class_mock.return_value, - res[0][i]) - self.assertEqual( - (self._pika_engine, self._poller_channel_mock) + params[i][1:], - incoming_message_class_mock.call_args_list[i][0] - ) - - self.assertTrue(self._pika_engine.create_connection.called) - self.assertTrue(self._poller_connection_mock.channel.called) - - self.assertTrue(declare_queue_binding_mock.called) - - -class RpcServicePikaPollerTestCase(unittest.TestCase): - def setUp(self): - self._pika_engine = mock.Mock() - self._poller_connection_mock = mock.Mock() - self._poller_channel_mock = mock.Mock() - self._poller_connection_mock.channel.return_value = ( - self._poller_channel_mock - ) - self._pika_engine.create_connection.return_value = ( - self._poller_connection_mock - ) - - self._pika_engine.get_rpc_queue_name.side_effect = ( - lambda topic, server, no_ack, worker=False: - "_".join([topic, str(server), str(no_ack), str(worker)]) - ) - - self._pika_engine.get_rpc_exchange_name.side_effect = ( - lambda exchange: exchange - ) - - self._prefetch_count = 123 - self._target = mock.Mock(exchange="exchange", topic="topic", - server="server") - self._pika_engine.rpc_queue_expiration = 12345 - - @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." - "RpcPikaIncomingMessage") - def test_declare_rpc_queue_bindings(self, rpc_pika_incoming_message_mock): - poller = pika_poller.RpcServicePikaPoller( - self._pika_engine, self._target, 1, None, - self._prefetch_count - ) - - poller.start(None) - - self.assertTrue(self._pika_engine.create_connection.called) - self.assertTrue(self._poller_connection_mock.channel.called) - - declare_queue_binding_by_channel_mock = ( - self._pika_engine.declare_queue_binding_by_channel - ) - - self.assertEqual( - 6, declare_queue_binding_by_channel_mock.call_count - ) - - declare_queue_binding_by_channel_mock.assert_has_calls(( - mock.call( - channel=self._poller_channel_mock, durable=False, - exchange="exchange", - exchange_type='direct', - queue="topic_None_True_False", - queue_expiration=12345, - routing_key="topic_None_True_False" - ), - mock.call( - channel=self._poller_channel_mock, durable=False, - exchange="exchange", - exchange_type='direct', - queue="topic_server_True_False", - queue_expiration=12345, - routing_key="topic_server_True_False" - ), - mock.call( - channel=self._poller_channel_mock, durable=False, - exchange="exchange", - exchange_type='direct', - queue="topic_server_True_True", - queue_expiration=12345, - routing_key="topic_all_workers_True_False" - ), - mock.call( - channel=self._poller_channel_mock, durable=False, - exchange="exchange", - exchange_type='direct', - queue="topic_None_False_False", - queue_expiration=12345, - routing_key="topic_None_False_False" - ), - mock.call( - channel=self._poller_channel_mock, durable=False, - exchange="exchange", - exchange_type='direct', - queue="topic_server_False_False", - queue_expiration=12345, - routing_key='topic_server_False_False' - ), - mock.call( - channel=self._poller_channel_mock, durable=False, - exchange="exchange", - exchange_type='direct', - queue="topic_server_False_True", - queue_expiration=12345, - routing_key='topic_all_workers_False_False' - ) - )) - - -class RpcReplyServicePikaPollerTestCase(unittest.TestCase): - def setUp(self): - self._pika_engine = mock.Mock() - self._poller_connection_mock = mock.Mock() - self._poller_channel_mock = mock.Mock() - self._poller_connection_mock.channel.return_value = ( - self._poller_channel_mock - ) - self._pika_engine.create_connection.return_value = ( - self._poller_connection_mock - ) - - self._prefetch_count = 123 - self._exchange = "rpc_reply_exchange" - self._queue = "rpc_reply_queue" - - self._pika_engine.rpc_reply_retry_delay = 12132543456 - - self._pika_engine.rpc_queue_expiration = 12345 - self._pika_engine.rpc_reply_retry_attempts = 3 - - def test_declare_rpc_reply_queue_binding(self): - poller = pika_poller.RpcReplyPikaPoller( - self._pika_engine, self._exchange, self._queue, 1, None, - self._prefetch_count, - ) - - poller.start(None) - poller.stop() - - declare_queue_binding_by_channel_mock = ( - self._pika_engine.declare_queue_binding_by_channel - ) - - self.assertEqual( - 1, declare_queue_binding_by_channel_mock.call_count - ) - - declare_queue_binding_by_channel_mock.assert_called_once_with( - channel=self._poller_channel_mock, durable=False, - exchange='rpc_reply_exchange', exchange_type='direct', - queue='rpc_reply_queue', queue_expiration=12345, - routing_key='rpc_reply_queue' - ) - - -class NotificationPikaPollerTestCase(unittest.TestCase): - def setUp(self): - self._pika_engine = mock.Mock() - self._poller_connection_mock = mock.Mock() - self._poller_channel_mock = mock.Mock() - self._poller_connection_mock.channel.return_value = ( - self._poller_channel_mock - ) - self._pika_engine.create_connection.return_value = ( - self._poller_connection_mock - ) - - self._prefetch_count = 123 - self._target_and_priorities = ( - ( - mock.Mock(exchange="exchange1", topic="topic1", - server="server1"), 1 - ), - ( - mock.Mock(exchange="exchange1", topic="topic1"), 2 - ), - ( - mock.Mock(exchange="exchange2", topic="topic2",), 1 - ), - ) - self._pika_engine.notification_persistence = object() - - def test_declare_notification_queue_bindings_default_queue(self): - poller = pika_poller.NotificationPikaPoller( - self._pika_engine, self._target_and_priorities, 1, None, - self._prefetch_count, None - ) - - poller.start(None) - - self.assertTrue(self._pika_engine.create_connection.called) - self.assertTrue(self._poller_connection_mock.channel.called) - - declare_queue_binding_by_channel_mock = ( - self._pika_engine.declare_queue_binding_by_channel - ) - - self.assertEqual( - 3, declare_queue_binding_by_channel_mock.call_count - ) - - declare_queue_binding_by_channel_mock.assert_has_calls(( - mock.call( - channel=self._poller_channel_mock, - durable=self._pika_engine.notification_persistence, - exchange="exchange1", - exchange_type='direct', - queue="topic1.1", - queue_expiration=None, - routing_key="topic1.1" - ), - mock.call( - channel=self._poller_channel_mock, - durable=self._pika_engine.notification_persistence, - exchange="exchange1", - exchange_type='direct', - queue="topic1.2", - queue_expiration=None, - routing_key="topic1.2" - ), - mock.call( - channel=self._poller_channel_mock, - durable=self._pika_engine.notification_persistence, - exchange="exchange2", - exchange_type='direct', - queue="topic2.1", - queue_expiration=None, - routing_key="topic2.1" - ) - )) - - def test_declare_notification_queue_bindings_custom_queue(self): - poller = pika_poller.NotificationPikaPoller( - self._pika_engine, self._target_and_priorities, 1, None, - self._prefetch_count, "custom_queue_name" - ) - - poller.start(None) - - self.assertTrue(self._pika_engine.create_connection.called) - self.assertTrue(self._poller_connection_mock.channel.called) - - declare_queue_binding_by_channel_mock = ( - self._pika_engine.declare_queue_binding_by_channel - ) - - self.assertEqual( - 3, declare_queue_binding_by_channel_mock.call_count - ) - - declare_queue_binding_by_channel_mock.assert_has_calls(( - mock.call( - channel=self._poller_channel_mock, - durable=self._pika_engine.notification_persistence, - exchange="exchange1", - exchange_type='direct', - queue="custom_queue_name", - queue_expiration=None, - routing_key="topic1.1" - ), - mock.call( - channel=self._poller_channel_mock, - durable=self._pika_engine.notification_persistence, - exchange="exchange1", - exchange_type='direct', - queue="custom_queue_name", - queue_expiration=None, - routing_key="topic1.2" - ), - mock.call( - channel=self._poller_channel_mock, - durable=self._pika_engine.notification_persistence, - exchange="exchange2", - exchange_type='direct', - queue="custom_queue_name", - queue_expiration=None, - routing_key="topic2.1" - ) - )) diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py index ffeaf0a..db06d01 100644 --- a/oslo_messaging/tests/functional/test_rabbitmq.py +++ b/oslo_messaging/tests/functional/test_rabbitmq.py @@ -63,10 +63,6 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase): self.n2 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME2"] self.n3 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME3"] - # NOTE(gdavoian): additional tweak for pika driver - if self.driver == "pika": - self.url = self.url.replace("rabbit", "pika") - # ensure connections will be establish to the first node self.pifpaf.stop_node(self.n2) self.pifpaf.stop_node(self.n3) @@ -115,19 +111,6 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase): return "callback done" def _check_ports(self, port): - getattr(self, '_check_ports_%s_driver' % self.driver)(port) - - def _check_ports_pika_driver(self, port): - rpc_server = self.servers.servers[0].server - # FIXME(sileht): Check other connections - connections = [ - rpc_server.listener._connection - ] - for conn in connections: - self.assertEqual( - port, conn._impl.socket.getpeername()[1]) - - def _check_ports_rabbit_driver(self, port): rpc_server = self.servers.servers[0].server connection_contexts = [ # rpc server diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index c381ebd..82bdbd9 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -304,8 +304,6 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): driver = os.environ.get("TRANSPORT_DRIVER") if driver: self.url = os.environ.get('PIFPAF_URL') - if driver == "pika" and self.url: - self.url = self.url.replace("rabbit://", "pika://") else: self.url = os.environ.get('TRANSPORT_URL') diff --git a/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml b/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml deleted file mode 100644 index e07f551..0000000 --- a/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml +++ /dev/null @@ -1,15 +0,0 @@ -- hosts: primary - tasks: - - - name: Copy files from {{ ansible_user_dir }}/workspace/ on node - synchronize: - src: '{{ ansible_user_dir }}/workspace/' - dest: '{{ zuul.executor.log_root }}' - mode: pull - copy_links: true - verify_host: true - rsync_opts: - - --include=/logs/** - - --include=*/ - - --exclude=* - - --prune-empty-dirs diff --git a/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml b/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml deleted file mode 100644 index 572f1d1..0000000 --- a/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml +++ /dev/null @@ -1,42 +0,0 @@ -- hosts: all - name: Autoconverted job legacy-oslo.messaging-src-dsvm-full-pika-default from old - job gate-oslo.messaging-src-dsvm-full-pika-default-ubuntu-xenial-nv - tasks: - - - name: Ensure legacy workspace directory - file: - path: '{{ ansible_user_dir }}/workspace' - state: directory - - - shell: - cmd: | - set -e - set -x - cat > clonemap.yaml << EOF - clonemap: - - name: openstack-infra/devstack-gate - dest: devstack-gate - EOF - /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \ - git://git.openstack.org \ - openstack-infra/devstack-gate - executable: /bin/bash - chdir: '{{ ansible_user_dir }}/workspace' - environment: '{{ zuul | zuul_legacy_vars }}' - - - shell: - cmd: | - set -e - set -x - export PYTHONUNBUFFERED=true - export DEVSTACK_GATE_TEMPEST=1 - export DEVSTACK_GATE_TEMPEST_FULL=1 - export PROJECTS="openstack/devstack-plugin-pika $PROJECTS" - export DEVSTACK_LOCAL_CONFIG="enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika" - export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging" - - cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh - ./safe-devstack-vm-gate-wrap.sh - executable: /bin/bash - chdir: '{{ ansible_user_dir }}/workspace' - environment: '{{ zuul | zuul_legacy_vars }}' diff --git a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml b/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml deleted file mode 100644 index dac8753..0000000 --- a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml +++ /dev/null @@ -1,80 +0,0 @@ -- hosts: primary - tasks: - - - name: Copy files from {{ ansible_user_dir }}/workspace/ on node - synchronize: - src: '{{ ansible_user_dir }}/workspace/' - dest: '{{ zuul.executor.log_root }}' - mode: pull - copy_links: true - verify_host: true - rsync_opts: - - --include=**/*nose_results.html - - --include=*/ - - --exclude=* - - --prune-empty-dirs - - - name: Copy files from {{ ansible_user_dir }}/workspace/ on node - synchronize: - src: '{{ ansible_user_dir }}/workspace/' - dest: '{{ zuul.executor.log_root }}' - mode: pull - copy_links: true - verify_host: true - rsync_opts: - - --include=**/*testr_results.html.gz - - --include=*/ - - --exclude=* - - --prune-empty-dirs - - - name: Copy files from {{ ansible_user_dir }}/workspace/ on node - synchronize: - src: '{{ ansible_user_dir }}/workspace/' - dest: '{{ zuul.executor.log_root }}' - mode: pull - copy_links: true - verify_host: true - rsync_opts: - - --include=/.testrepository/tmp* - - --include=*/ - - --exclude=* - - --prune-empty-dirs - - - name: Copy files from {{ ansible_user_dir }}/workspace/ on node - synchronize: - src: '{{ ansible_user_dir }}/workspace/' - dest: '{{ zuul.executor.log_root }}' - mode: pull - copy_links: true - verify_host: true - rsync_opts: - - --include=**/*testrepository.subunit.gz - - --include=*/ - - --exclude=* - - --prune-empty-dirs - - - name: Copy files from {{ ansible_user_dir }}/workspace/ on node - synchronize: - src: '{{ ansible_user_dir }}/workspace/' - dest: '{{ zuul.executor.log_root }}/tox' - mode: pull - copy_links: true - verify_host: true - rsync_opts: - - --include=/.tox/*/log/* - - --include=*/ - - --exclude=* - - --prune-empty-dirs - - - name: Copy files from {{ ansible_user_dir }}/workspace/ on node - synchronize: - src: '{{ ansible_user_dir }}/workspace/' - dest: '{{ zuul.executor.log_root }}' - mode: pull - copy_links: true - verify_host: true - rsync_opts: - - --include=/logs/** - - --include=*/ - - --exclude=* - - --prune-empty-dirs diff --git a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml b/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml deleted file mode 100644 index 8cad6ef..0000000 --- a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml +++ /dev/null @@ -1,79 +0,0 @@ -- hosts: all - name: Autoconverted job legacy-oslo.messaging-telemetry-dsvm-integration-pika from - old job gate-oslo.messaging-telemetry-dsvm-integration-pika-ubuntu-xenial-nv - tasks: - - - name: Ensure legacy workspace directory - file: - path: '{{ ansible_user_dir }}/workspace' - state: directory - - - shell: - cmd: | - set -e - set -x - cat > clonemap.yaml << EOF - clonemap: - - name: openstack-infra/devstack-gate - dest: devstack-gate - EOF - /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \ - git://git.openstack.org \ - openstack-infra/devstack-gate - executable: /bin/bash - chdir: '{{ ansible_user_dir }}/workspace' - environment: '{{ zuul | zuul_legacy_vars }}' - - - shell: - cmd: | - set -e - set -x - export PYTHONUNBUFFERED=true - - export DEVSTACK_GATE_HEAT=1 - export DEVSTACK_GATE_NEUTRON=1 - export DEVSTACK_GATE_TEMPEST=1 - export DEVSTACK_GATE_EXERCISES=0 - export DEVSTACK_GATE_INSTALL_TESTONLY=1 - - export PROJECTS="openstack/ceilometer $PROJECTS" - export PROJECTS="openstack/aodh $PROJECTS" - export PROJECTS="openstack/devstack-plugin-pika $PROJECTS" - - case "$ZUUL_BRANCH" in - "stable/ocata") - export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin gnocchi git://git.openstack.org/openstack/gnocchi" - export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://git.openstack.org/openstack/panko" - export OVERRIDE_GNOCCHI_PROJECT_BRANCH="stable/3.1" - export PROJECTS="openstack/panko $PROJECTS openstack/gnocchi" - ;; - *) - export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://git.openstack.org/openstack/panko" - export PROJECTS="openstack/panko $PROJECTS" - ;; - esac - export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin ceilometer git://git.openstack.org/openstack/ceilometer" - export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin aodh git://git.openstack.org/openstack/aodh" - export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin heat git://git.openstack.org/openstack/heat" - - export DEVSTACK_LOCAL_CONFIG+=$'\n'"CEILOMETER_BACKEND=gnocchi" - - export DEVSTACK_LOCAL_CONFIG+=$'\n'"GNOCCHI_ARCHIVE_POLICY=high" - export DEVSTACK_LOCAL_CONFIG+=$'\n'"CEILOMETER_PIPELINE_INTERVAL=5" - export DEVSTACK_LOCAL_CONFIG+=$'\n'"GNOCCHI_STORAGE_BACKEND=file" - - export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika" - - export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging" - - function post_test_hook { - cd /opt/stack/new/ceilometer/ceilometer/tests/integration/hooks/ - ./post_test_hook.sh - } - export -f post_test_hook - - cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh - ./safe-devstack-vm-gate-wrap.sh - executable: /bin/bash - chdir: '{{ ansible_user_dir }}/workspace' - environment: '{{ zuul | zuul_legacy_vars }}' diff --git a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml b/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml deleted file mode 100644 index e07f551..0000000 --- a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml +++ /dev/null @@ -1,15 +0,0 @@ -- hosts: primary - tasks: - - - name: Copy files from {{ ansible_user_dir }}/workspace/ on node - synchronize: - src: '{{ ansible_user_dir }}/workspace/' - dest: '{{ zuul.executor.log_root }}' - mode: pull - copy_links: true - verify_host: true - rsync_opts: - - --include=/logs/** - - --include=*/ - - --exclude=* - - --prune-empty-dirs diff --git a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml b/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml deleted file mode 100644 index 5dd6533..0000000 --- a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml +++ /dev/null @@ -1,45 +0,0 @@ -- hosts: all - name: Autoconverted job legacy-tempest-neutron-dsvm-src-oslo.messaging-pika-default - from old job gate-tempest-neutron-dsvm-src-oslo.messaging-pika-default-ubuntu-xenial-nv - tasks: - - - name: Ensure legacy workspace directory - file: - path: '{{ ansible_user_dir }}/workspace' - state: directory - - - shell: - cmd: | - set -e - set -x - cat > clonemap.yaml << EOF - clonemap: - - name: openstack-infra/devstack-gate - dest: devstack-gate - EOF - /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \ - git://git.openstack.org \ - openstack-infra/devstack-gate - executable: /bin/bash - chdir: '{{ ansible_user_dir }}/workspace' - environment: '{{ zuul | zuul_legacy_vars }}' - - - shell: - cmd: | - set -e - set -x - export PYTHONUNBUFFERED=true - export DEVSTACK_GATE_TEMPEST=1 - export DEVSTACK_GATE_TEMPEST_FULL=1 - export DEVSTACK_GATE_NEUTRON=1 - - export PROJECTS="openstack/devstack-plugin-pika $PROJECTS" - export DEVSTACK_LOCAL_CONFIG="enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika" - - export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging" - - cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh - ./safe-devstack-vm-gate-wrap.sh - executable: /bin/bash - chdir: '{{ ansible_user_dir }}/workspace' - environment: '{{ zuul | zuul_legacy_vars }}' diff --git a/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml b/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml new file mode 100644 index 0000000..0d4e123 --- /dev/null +++ b/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml @@ -0,0 +1,8 @@ +--- +prelude: > + The Pika-based driver for RabbitMQ has been removed. +upgrade: + - | + Users of the Pika-based driver must change the prefix of all the + transport_url configuration options from "pika://..." to + "rabbit://..." to use the default kombu based RabbitMQ driver. diff --git a/requirements.txt b/requirements.txt index c6a0912..2055389 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,10 +28,8 @@ PyYAML>=3.12 # MIT # we set the amqp version to ensure heartbeat works amqp!=2.1.4,>=2.1.1 # BSD kombu!=4.0.2,>=4.0.0 # BSD -pika>=0.10.0 # BSD -pika-pool>=0.1.3 # BSD -# used by pika and zmq drivers +# used by zmq driver futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD tenacity>=3.2.1 # Apache-2.0 @@ -41,7 +41,6 @@ oslo.messaging.drivers = # This is just for internal testing fake = oslo_messaging._drivers.impl_fake:FakeDriver - pika = oslo_messaging._drivers.impl_pika:PikaDriver oslo.messaging.executors = blocking = futurist:SynchronousExecutor @@ -56,21 +55,6 @@ oslo.messaging.notify.drivers = noop = oslo_messaging.notify._impl_noop:NoOpDriver routing = oslo_messaging.notify._impl_routing:RoutingDriver -oslo.messaging.pika.connection_factory = - # Creates new connection for each create_connection call. Old-style behaviour - # Uses a much more connections then single and read_write factories but still available as - # an option - new = oslo_messaging._drivers.pika_driver.pika_connection_factory:PikaConnectionFactory - - # Creates only one connection for transport and return it for each create connection call - # it is default, but you can not use it with synchronous executor - single = oslo_messaging._drivers.pika_driver.pika_connection_factory:SinglePikaConnectionFactory - - # Create two connections - one for listening and another one for sending and return them - # for each create connection call depending on connection purpose. Creates one more connection - # but you can use it with synchronous executor - read_write = oslo_messaging._drivers.pika_driver.pika_connection_factory:ReadWritePikaConnectionFactory - oslo.messaging.zmq.matchmaker = # Matchmakers for ZeroMQ dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy @@ -44,12 +44,6 @@ setenv = basepython = python3.5 commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}' -[testenv:py27-func-pika] -setenv = - {[testenv]setenv} - TRANSPORT_DRIVER=pika -commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}' - [testenv:py27-func-kafka] setenv = {[testenv]setenv} |