summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.zuul.yaml57
-rw-r--r--bindep.txt8
-rw-r--r--doc/source/admin/index.rst1
-rw-r--r--doc/source/admin/pika_driver.rst160
-rw-r--r--oslo_messaging/_drivers/impl_pika.py366
-rw-r--r--oslo_messaging/_drivers/pika_driver/__init__.py0
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_commons.py40
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_connection.py542
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_connection_factory.py307
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_engine.py303
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_exceptions.py68
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_listener.py123
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_message.py618
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_poller.py538
-rw-r--r--oslo_messaging/opts.py7
-rw-r--r--oslo_messaging/tests/drivers/pika/__init__.py0
-rw-r--r--oslo_messaging/tests/drivers/pika/test_message.py615
-rw-r--r--oslo_messaging/tests/drivers/pika/test_poller.py482
-rw-r--r--oslo_messaging/tests/functional/test_rabbitmq.py17
-rw-r--r--oslo_messaging/tests/functional/utils.py2
-rw-r--r--playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml15
-rw-r--r--playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml42
-rw-r--r--playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml80
-rw-r--r--playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml79
-rw-r--r--playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml15
-rw-r--r--playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml45
-rw-r--r--releasenotes/notes/remove-pika-1bae204ced2521a3.yaml8
-rw-r--r--requirements.txt4
-rw-r--r--setup.cfg16
-rw-r--r--tox.ini6
30 files changed, 13 insertions, 4551 deletions
diff --git a/.zuul.yaml b/.zuul.yaml
index 110cc89..11fb98b 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -12,13 +12,6 @@
bindep_profile: kafka
- job:
- name: oslo.messaging-tox-py27-func-pika
- parent: openstack-tox-py27
- vars:
- tox_envlist: py27-func-pika
- bindep_profile: pika
-
-- job:
name: oslo.messaging-tox-py27-func-rabbit
parent: openstack-tox-py27
vars:
@@ -77,17 +70,6 @@
- openstack/oslo.messaging
- job:
- name: oslo.messaging-src-dsvm-full-pika-default
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml
- post-run: playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml
- timeout: 10800
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/devstack-plugin-pika
- - openstack/oslo.messaging
-
-- job:
name: oslo.messaging-src-dsvm-full-amqp1-dual-centos-7
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-src-dsvm-full-amqp1-dual-centos-7/run.yaml
@@ -212,24 +194,6 @@
- openstack/diskimage-builder
- job:
- name: oslo.messaging-telemetry-dsvm-integration-pika
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml
- post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml
- timeout: 4200
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/aodh
- - openstack/ceilometer
- - openstack/devstack-plugin-pika
- - openstack/oslo.messaging
- - openstack/panko
- # following are required when DEVSTACK_GATE_HEAT, which this
- # job turns on
- - openstack/dib-utils
- - openstack/diskimage-builder
-
-- job:
name: oslo.messaging-telemetry-dsvm-integration-zmq
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml
@@ -305,19 +269,6 @@
- openstack/tempest
- job:
- name: oslo.messaging-tempest-neutron-dsvm-src-pika-default
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml
- post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml
- timeout: 7800
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/devstack-plugin-pika
- - openstack/neutron
- - openstack/oslo.messaging
- - openstack/tempest
-
-- job:
name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml
@@ -338,7 +289,6 @@
voting: false
- oslo.messaging-tox-py27-func-kafka:
voting: false
- - oslo.messaging-tox-py27-func-pika
- oslo.messaging-tox-py27-func-rabbit
- oslo.messaging-tox-py27-func-zmq-proxy:
voting: false
@@ -364,8 +314,6 @@
voting: false
- oslo.messaging-src-dsvm-full-kafka-default:
voting: false
- - oslo.messaging-src-dsvm-full-pika-default:
- voting: false
- oslo.messaging-src-dsvm-full-zmq-default:
voting: false
@@ -379,8 +327,6 @@
voting: false
- oslo.messaging-telemetry-dsvm-integration-kafka:
voting: false
- - oslo.messaging-telemetry-dsvm-integration-pika:
- voting: false
- oslo.messaging-telemetry-dsvm-integration-zmq:
voting: false
@@ -390,15 +336,12 @@
branches: ^(?!stable/ocata).*$
- oslo.messaging-tempest-neutron-dsvm-src-kafka-default:
voting: false
- - oslo.messaging-tempest-neutron-dsvm-src-pika-default:
- voting: false
- oslo.messaging-tempest-neutron-dsvm-src-zmq-default:
voting: false
gate:
jobs:
- oslo.messaging-tox-py27-func-rabbit
- - oslo.messaging-tox-py27-func-pika
- oslo.messaging-telemetry-dsvm-integration-rabbit
- oslo.messaging-src-dsvm-full-rabbit-default
- oslo.messaging-tempest-neutron-dsvm-src-rabbit-default
diff --git a/bindep.txt b/bindep.txt
index 96788ae..d9728bb 100644
--- a/bindep.txt
+++ b/bindep.txt
@@ -10,11 +10,9 @@ make [platform:rpm]
pkgconfig [platform:rpm]
libffi-devel [platform:rpm]
-# kombu/pika dpkg
-rabbitmq-server [platform:dpkg rabbit pika]
-
-# kombu/pika rpm
-rabbitmq-server [platform:rpm rabbit pika]
+# RabbitMQ message broker
+rabbitmq-server [platform:dpkg rabbit]
+rabbitmq-server [platform:rpm rabbit]
# zmq
redis [platform:rpm zmq]
diff --git a/doc/source/admin/index.rst b/doc/source/admin/index.rst
index d032aaf..63104bb 100644
--- a/doc/source/admin/index.rst
+++ b/doc/source/admin/index.rst
@@ -7,5 +7,4 @@ Deployment Guide
drivers
AMQP1.0
- pika_driver
zmq_driver
diff --git a/doc/source/admin/pika_driver.rst b/doc/source/admin/pika_driver.rst
deleted file mode 100644
index b89fabd..0000000
--- a/doc/source/admin/pika_driver.rst
+++ /dev/null
@@ -1,160 +0,0 @@
-------------------------------
-Pika Driver Deployment Guide
-------------------------------
-
-.. currentmodule:: oslo_messaging
-
-.. warning:: the Pika driver is no longer maintained and will be
- removed from Oslo.Messaging at a future date. It is recommended that
- all users of the Pika driver transition to using the Rabbit driver.
-
-============
-Introduction
-============
-
-Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including
-RabbitMQ's extensions. It is very actively supported and recommended by
-RabbitMQ developers
-
-========
-Abstract
-========
-
-PikaDriver is one of oslo.messaging backend drivers. It supports RPC and Notify
-patterns. Currently it could be the only oslo.messaging driver across the
-OpenStack cluster. This document provides deployment information for this
-driver in oslo_messaging.
-
-This driver is able to work with single instance of RabbitMQ server or
-RabbitMQ cluster.
-
-
-=============
-Configuration
-=============
-
-Enabling (mandatory)
---------------------
-
-To enable the driver, in the section [DEFAULT] of the conf file,
-the 'transport_url' parameter should be set to
-`pika://user:pass@host1:port[,hostN:portN]`
-
- [DEFAULT]
- transport_url = pika://guest:guest@localhost:5672
-
-
-Connection options (optional)
------------------------------
-
-In section [oslo_messaging_pika]:
-#. channel_max - Maximum number of channels to allow,
-
-#. frame_max (default - pika default value): The maximum byte size for
- an AMQP frame,
-
-#. heartbeat_interval (default=1): How often to send heartbeats for
- consumer's connections in seconds. If 0 - disable heartbeats,
-
-#. ssl (default=False): Enable SSL if True,
-
-#. ssl_options (default=None): Arguments passed to ssl.wrap_socket,
-
-#. socket_timeout (default=0.25): Set timeout for opening new connection's
- socket,
-
-#. tcp_user_timeout (default=0.25): Set TCP_USER_TIMEOUT in seconds for
- connection's socket,
-
-#. host_connection_reconnect_delay (default=0.25): Set delay for reconnection
- to some host after connection error
-
-
-Connection pool options (optional)
-----------------------------------
-
-In section [oslo_messaging_pika]:
-
-#. pool_max_size (default=10): Maximum number of connections to keep queued,
-
-#. pool_max_overflow (default=0): Maximum number of connections to create above
- `pool_max_size`,
-
-#. pool_timeout (default=30): Default number of seconds to wait for a
- connections to available,
-
-#. pool_recycle (default=600): Lifetime of a connection (since creation) in
- seconds or None for no recycling. Expired connections are closed on acquire,
-
-#. pool_stale (default=60): Threshold at which inactive (since release)
- connections are considered stale in seconds or None for no staleness.
- Stale connections are closed on acquire.")
-
-RPC related options (optional)
-------------------------------
-
-In section [oslo_messaging_pika]:
-
-#. rpc_queue_expiration (default=60): Time to live for rpc queues without
- consumers in seconds,
-
-#. default_rpc_exchange (default="${control_exchange}_rpc"): Exchange name for
- sending RPC messages,
-
-#. rpc_reply_exchange', default=("${control_exchange}_rpc_reply"): Exchange
- name for receiving RPC replies,
-
-#. rpc_listener_prefetch_count (default=100): Max number of not acknowledged
- message which RabbitMQ can send to rpc listener,
-
-#. rpc_reply_listener_prefetch_count (default=100): Max number of not
- acknowledged message which RabbitMQ can send to rpc reply listener,
-
-#. rpc_reply_retry_attempts (default=-1): Reconnecting retry count in case of
- connectivity problem during sending reply. -1 means infinite retry during
- rpc_timeout,
-
-#. rpc_reply_retry_delay (default=0.25) Reconnecting retry delay in case of
- connectivity problem during sending reply,
-
-#. default_rpc_retry_attempts (default=-1): Reconnecting retry count in case of
- connectivity problem during sending RPC message, -1 means infinite retry. If
- actual retry attempts in not 0 the rpc request could be processed more than
- one time,
-
-#. rpc_retry_delay (default=0.25): Reconnecting retry delay in case of
- connectivity problem during sending RPC message
-
-$control_exchange in this code is value of [DEFAULT].control_exchange option,
-which is "openstack" by default
-
-Notification related options (optional)
----------------------------------------
-
-In section [oslo_messaging_pika]:
-
-#. notification_persistence (default=False): Persist notification messages,
-
-#. default_notification_exchange (default="${control_exchange}_notification"):
- Exchange name for sending notifications,
-
-#. notification_listener_prefetch_count (default=100): Max number of not
- acknowledged message which RabbitMQ can send to notification listener,
-
-#. default_notification_retry_attempts (default=-1): Reconnecting retry count
- in case of connectivity problem during sending notification, -1 means
- infinite retry,
-
-#. notification_retry_delay (default=0.25): Reconnecting retry delay in case of
- connectivity problem during sending notification message
-
-$control_exchange in this code is value of [DEFAULT].control_exchange option,
-which is "openstack" by default
-
-DevStack Support
-----------------
-
-Pika driver is supported by DevStack. To enable it you should edit
-local.conf [localrc] section and add next there:
-
- enable_plugin pika https://git.openstack.org/openstack/devstack-plugin-pika
diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
deleted file mode 100644
index 65e7b48..0000000
--- a/oslo_messaging/_drivers/impl_pika.py
+++ /dev/null
@@ -1,366 +0,0 @@
-# Copyright 2011 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from debtcollector import deprecate
-from oslo_config import cfg
-from oslo_log import log as logging
-from oslo_utils import timeutils
-import pika_pool
-import tenacity
-
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers import common
-from oslo_messaging._drivers.pika_driver import (pika_connection_factory as
- pika_drv_conn_factory)
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr
-from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
-from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
-from oslo_messaging import exceptions
-
-LOG = logging.getLogger(__name__)
-
-pika_pool_opts = [
- cfg.IntOpt('pool_max_size', default=30,
- help="Maximum number of connections to keep queued."),
- cfg.IntOpt('pool_max_overflow', default=0,
- help="Maximum number of connections to create above "
- "`pool_max_size`."),
- cfg.IntOpt('pool_timeout', default=30,
- help="Default number of seconds to wait for a connections to "
- "available"),
- cfg.IntOpt('pool_recycle', default=600,
- help="Lifetime of a connection (since creation) in seconds "
- "or None for no recycling. Expired connections are "
- "closed on acquire."),
- cfg.IntOpt('pool_stale', default=60,
- help="Threshold at which inactive (since release) connections "
- "are considered stale in seconds or None for no "
- "staleness. Stale connections are closed on acquire.")
-]
-
-message_opts = [
- cfg.StrOpt('default_serializer_type', default='json',
- choices=('json', 'msgpack'),
- help="Default serialization mechanism for "
- "serializing/deserializing outgoing/incoming messages")
-]
-
-notification_opts = [
- cfg.BoolOpt('notification_persistence', default=False,
- help="Persist notification messages."),
- cfg.StrOpt('default_notification_exchange',
- default="${control_exchange}_notification",
- help="Exchange name for sending notifications"),
- cfg.IntOpt(
- 'notification_listener_prefetch_count', default=100,
- help="Max number of not acknowledged message which RabbitMQ can send "
- "to notification listener."
- ),
- cfg.IntOpt(
- 'default_notification_retry_attempts', default=-1,
- help="Reconnecting retry count in case of connectivity problem during "
- "sending notification, -1 means infinite retry."
- ),
- cfg.FloatOpt(
- 'notification_retry_delay', default=0.25,
- help="Reconnecting retry delay in case of connectivity problem during "
- "sending notification message"
- )
-]
-
-rpc_opts = [
- cfg.IntOpt('rpc_queue_expiration', default=60,
- help="Time to live for rpc queues without consumers in "
- "seconds."),
- cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc",
- help="Exchange name for sending RPC messages"),
- cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply",
- help="Exchange name for receiving RPC replies"),
- cfg.IntOpt(
- 'rpc_listener_prefetch_count', default=100,
- help="Max number of not acknowledged message which RabbitMQ can send "
- "to rpc listener."
- ),
- cfg.IntOpt(
- 'rpc_reply_listener_prefetch_count', default=100,
- help="Max number of not acknowledged message which RabbitMQ can send "
- "to rpc reply listener."
- ),
- cfg.IntOpt(
- 'rpc_reply_retry_attempts', default=-1,
- help="Reconnecting retry count in case of connectivity problem during "
- "sending reply. -1 means infinite retry during rpc_timeout"
- ),
- cfg.FloatOpt(
- 'rpc_reply_retry_delay', default=0.25,
- help="Reconnecting retry delay in case of connectivity problem during "
- "sending reply."
- ),
- cfg.IntOpt(
- 'default_rpc_retry_attempts', default=-1,
- help="Reconnecting retry count in case of connectivity problem during "
- "sending RPC message, -1 means infinite retry. If actual "
- "retry attempts in not 0 the rpc request could be processed more "
- "than one time"
- ),
- cfg.FloatOpt(
- 'rpc_retry_delay', default=0.25,
- help="Reconnecting retry delay in case of connectivity problem during "
- "sending RPC message"
- )
-]
-
-
-class PikaDriver(base.BaseDriver):
- """Pika Driver
-
- **Warning**: The ``pika`` driver has been deprecated and will be removed in
- a future release. It is recommended that all users of the ``pika`` driver
- transition to using the ``rabbit`` driver.
- """
-
- def __init__(self, conf, url, default_exchange=None,
- allowed_remote_exmods=None):
-
- deprecate("The pika driver is no longer maintained. It has been"
- " deprecated",
- message="It is recommended that all users of the pika driver"
- " transition to using the rabbit driver.",
- version="pike", removal_version="rocky")
-
- opt_group = cfg.OptGroup(name='oslo_messaging_pika',
- title='Pika driver options')
- conf.register_group(opt_group)
- conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group)
- conf.register_opts(pika_pool_opts, group=opt_group)
- conf.register_opts(message_opts, group=opt_group)
- conf.register_opts(rpc_opts, group=opt_group)
- conf.register_opts(notification_opts, group=opt_group)
- conf = common.ConfigOptsProxy(conf, url, opt_group.name)
-
- self._pika_engine = pika_drv_engine.PikaEngine(
- conf, url, default_exchange, allowed_remote_exmods
- )
- self._reply_listener = pika_drv_lstnr.RpcReplyPikaListener(
- self._pika_engine
- )
- super(PikaDriver, self).__init__(conf, url, default_exchange,
- allowed_remote_exmods)
-
- def require_features(self, requeue=False):
- pass
-
- def _declare_rpc_exchange(self, exchange, stopwatch):
- timeout = stopwatch.leftover(return_none=True)
- with (self._pika_engine.connection_without_confirmation_pool
- .acquire(timeout=timeout)) as conn:
- try:
- self._pika_engine.declare_exchange_by_channel(
- conn.channel,
- self._pika_engine.get_rpc_exchange_name(
- exchange
- ), "direct", False
- )
- except pika_pool.Timeout as e:
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired. {}.".format(
- str(e)
- )
- )
-
- def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
- retry=None):
- with timeutils.StopWatch(duration=timeout) as stopwatch:
- if retry is None:
- retry = self._pika_engine.default_rpc_retry_attempts
-
- exchange = self._pika_engine.get_rpc_exchange_name(
- target.exchange
- )
-
- def on_exception(ex):
- if isinstance(ex, pika_drv_exc.ExchangeNotFoundException):
- # it is desired to create exchange because if we sent to
- # exchange which is not exists, we get ChannelClosed
- # exception and need to reconnect
- try:
- self._declare_rpc_exchange(exchange, stopwatch)
- except pika_drv_exc.ConnectionException as e:
- LOG.warning("Problem during declaring exchange. %s", e)
- return True
- elif isinstance(ex, (pika_drv_exc.ConnectionException,
- exceptions.MessageDeliveryFailure)):
- LOG.warning("Problem during message sending. %s", ex)
- return True
- else:
- return False
-
- if retry:
- retrier = tenacity.retry(
- stop=(tenacity.stop_never if retry == -1 else
- tenacity.stop_after_attempt(retry)),
- retry=tenacity.retry_if_exception(on_exception),
- wait=tenacity.wait_fixed(self._pika_engine.rpc_retry_delay)
- )
- else:
- retrier = None
-
- if target.fanout:
- return self.cast_all_workers(
- exchange, target.topic, ctxt, message, stopwatch, retrier
- )
-
- routing_key = self._pika_engine.get_rpc_queue_name(
- target.topic, target.server, retrier is None
- )
-
- msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine,
- message, ctxt)
- try:
- reply = msg.send(
- exchange=exchange,
- routing_key=routing_key,
- reply_listener=(
- self._reply_listener if wait_for_reply else None
- ),
- stopwatch=stopwatch,
- retrier=retrier
- )
- except pika_drv_exc.ExchangeNotFoundException as ex:
- try:
- self._declare_rpc_exchange(exchange, stopwatch)
- except pika_drv_exc.ConnectionException as e:
- LOG.warning("Problem during declaring exchange. %s", e)
- raise ex
-
- if reply is not None:
- if reply.failure is not None:
- raise reply.failure
-
- return reply.result
-
- def cast_all_workers(self, exchange, topic, ctxt, message, stopwatch,
- retrier=None):
- msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
- ctxt)
- try:
- msg.send(
- exchange=exchange,
- routing_key=self._pika_engine.get_rpc_queue_name(
- topic, "all_workers", retrier is None
- ),
- mandatory=False,
- stopwatch=stopwatch,
- retrier=retrier
- )
- except pika_drv_exc.ExchangeNotFoundException:
- try:
- self._declare_rpc_exchange(exchange, stopwatch)
- except pika_drv_exc.ConnectionException as e:
- LOG.warning("Problem during declaring exchange. %s", e)
-
- def _declare_notification_queue_binding(
- self, target, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH):
- if stopwatch.expired():
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired."
- )
- try:
- timeout = stopwatch.leftover(return_none=True)
- with (self._pika_engine.connection_without_confirmation_pool
- .acquire)(timeout=timeout) as conn:
- self._pika_engine.declare_queue_binding_by_channel(
- conn.channel,
- exchange=(
- target.exchange or
- self._pika_engine.default_notification_exchange
- ),
- queue=target.topic,
- routing_key=target.topic,
- exchange_type='direct',
- queue_expiration=None,
- durable=self._pika_engine.notification_persistence,
- )
- except pika_pool.Timeout as e:
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired. {}.".format(str(e))
- )
-
- def send_notification(self, target, ctxt, message, version, retry=None):
- if retry is None:
- retry = self._pika_engine.default_notification_retry_attempts
-
- def on_exception(ex):
- if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException,
- pika_drv_exc.RoutingException)):
- LOG.warning("Problem during sending notification. %s", ex)
- try:
- self._declare_notification_queue_binding(target)
- except pika_drv_exc.ConnectionException as e:
- LOG.warning("Problem during declaring notification queue "
- "binding. %s", e)
- return True
- elif isinstance(ex, (pika_drv_exc.ConnectionException,
- pika_drv_exc.MessageRejectedException)):
- LOG.warning("Problem during sending notification. %s", ex)
- return True
- else:
- return False
-
- if retry:
- retrier = tenacity.retry(
- stop=(tenacity.stop_never if retry == -1 else
- tenacity.stop_after_attempt(retry)),
- retry=tenacity.retry_if_exception(on_exception),
- wait=tenacity.wait_fixed(
- self._pika_engine.notification_retry_delay
- )
- )
- else:
- retrier = None
-
- msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
- ctxt)
- return msg.send(
- exchange=(
- target.exchange or
- self._pika_engine.default_notification_exchange
- ),
- routing_key=target.topic,
- confirm=True,
- mandatory=True,
- persistent=self._pika_engine.notification_persistence,
- retrier=retrier
- )
-
- def listen(self, target, batch_size, batch_timeout):
- return pika_drv_poller.RpcServicePikaPoller(
- self._pika_engine, target, batch_size, batch_timeout,
- self._pika_engine.rpc_listener_prefetch_count
- )
-
- def listen_for_notifications(self, targets_and_priorities, pool,
- batch_size, batch_timeout):
- return pika_drv_poller.NotificationPikaPoller(
- self._pika_engine, targets_and_priorities, batch_size,
- batch_timeout,
- self._pika_engine.notification_listener_prefetch_count, pool
- )
-
- def cleanup(self):
- self._reply_listener.cleanup()
- self._pika_engine.cleanup()
diff --git a/oslo_messaging/_drivers/pika_driver/__init__.py b/oslo_messaging/_drivers/pika_driver/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/pika_driver/__init__.py
+++ /dev/null
diff --git a/oslo_messaging/_drivers/pika_driver/pika_commons.py b/oslo_messaging/_drivers/pika_driver/pika_commons.py
deleted file mode 100644
index f5e9086..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_commons.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import select
-import socket
-
-from oslo_serialization.serializer import json_serializer
-from oslo_serialization.serializer import msgpack_serializer
-from oslo_utils import timeutils
-from pika import exceptions as pika_exceptions
-import six
-
-
-PIKA_CONNECTIVITY_ERRORS = (
- pika_exceptions.AMQPConnectionError,
- pika_exceptions.ConnectionClosed,
- pika_exceptions.ChannelClosed,
- socket.timeout,
- select.error
-)
-
-EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
-
-INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start()
-
-MESSAGE_SERIALIZERS = {
- 'application/json': json_serializer.JSONSerializer(),
- 'application/msgpack': msgpack_serializer.MessagePackSerializer()
-}
diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection.py b/oslo_messaging/_drivers/pika_driver/pika_connection.py
deleted file mode 100644
index f0dca5a..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_connection.py
+++ /dev/null
@@ -1,542 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import logging
-import os
-import threading
-
-import futurist
-from pika.adapters import select_connection
-from pika import exceptions as pika_exceptions
-from pika import spec as pika_spec
-
-from oslo_utils import eventletutils
-
-current_thread = eventletutils.fetch_current_thread_functor()
-
-LOG = logging.getLogger(__name__)
-
-
-class ThreadSafePikaConnection(object):
- def __init__(self, parameters=None,
- _impl_class=select_connection.SelectConnection):
- self.params = parameters
- self._connection_lock = threading.Lock()
- self._evt_closed = threading.Event()
- self._task_queue = collections.deque()
- self._pending_connection_futures = set()
-
- create_connection_future = self._register_pending_future()
-
- def on_open_error(conn, err):
- create_connection_future.set_exception(
- pika_exceptions.AMQPConnectionError(err)
- )
-
- self._impl = _impl_class(
- parameters=parameters,
- on_open_callback=create_connection_future.set_result,
- on_open_error_callback=on_open_error,
- on_close_callback=self._on_connection_close,
- stop_ioloop_on_close=False,
- )
- self._interrupt_pipein, self._interrupt_pipeout = os.pipe()
- self._impl.ioloop.add_handler(self._interrupt_pipein,
- self._impl.ioloop.read_interrupt,
- select_connection.READ)
-
- self._thread = threading.Thread(target=self._process_io)
- self._thread.daemon = True
- self._thread_id = None
- self._thread.start()
-
- create_connection_future.result()
-
- def _check_called_not_from_event_loop(self):
- if current_thread() == self._thread_id:
- raise RuntimeError("This call is not allowed from ioloop thread")
-
- def _execute_task(self, func, *args, **kwargs):
- if current_thread() == self._thread_id:
- return func(*args, **kwargs)
-
- future = futurist.Future()
- self._task_queue.append((func, args, kwargs, future))
-
- if self._evt_closed.is_set():
- self._notify_all_futures_connection_close()
- elif self._interrupt_pipeout is not None:
- os.write(self._interrupt_pipeout, b'X')
-
- return future.result()
-
- def _register_pending_future(self):
- future = futurist.Future()
- self._pending_connection_futures.add(future)
-
- def on_done_callback(fut):
- try:
- self._pending_connection_futures.remove(fut)
- except KeyError:
- pass
-
- future.add_done_callback(on_done_callback)
-
- if self._evt_closed.is_set():
- self._notify_all_futures_connection_close()
- return future
-
- def _notify_all_futures_connection_close(self):
- while self._task_queue:
- try:
- method_res_future = self._task_queue.pop()[3]
- except KeyError:
- break
- else:
- method_res_future.set_exception(
- pika_exceptions.ConnectionClosed()
- )
-
- while self._pending_connection_futures:
- try:
- pending_connection_future = (
- self._pending_connection_futures.pop()
- )
- except KeyError:
- break
- else:
- pending_connection_future.set_exception(
- pika_exceptions.ConnectionClosed()
- )
-
- def _on_connection_close(self, conn, reply_code, reply_text):
- self._evt_closed.set()
- self._notify_all_futures_connection_close()
- if self._interrupt_pipeout:
- os.close(self._interrupt_pipeout)
- os.close(self._interrupt_pipein)
-
- def add_on_close_callback(self, callback):
- return self._execute_task(self._impl.add_on_close_callback, callback)
-
- def _do_process_io(self):
- while self._task_queue:
- func, args, kwargs, future = self._task_queue.pop()
- try:
- res = func(*args, **kwargs)
- except BaseException as e:
- LOG.exception(e)
- future.set_exception(e)
- else:
- future.set_result(res)
-
- self._impl.ioloop.poll()
- self._impl.ioloop.process_timeouts()
-
- def _process_io(self):
- self._thread_id = current_thread()
- while not self._evt_closed.is_set():
- try:
- self._do_process_io()
- except BaseException:
- LOG.exception("Error during processing connection's IO")
-
- def close(self, *args, **kwargs):
- self._check_called_not_from_event_loop()
-
- res = self._execute_task(self._impl.close, *args, **kwargs)
-
- self._evt_closed.wait()
- self._thread.join()
- return res
-
- def channel(self, channel_number=None):
- self._check_called_not_from_event_loop()
-
- channel_opened_future = self._register_pending_future()
-
- impl_channel = self._execute_task(
- self._impl.channel,
- on_open_callback=channel_opened_future.set_result,
- channel_number=channel_number
- )
-
- # Create our proxy channel
- channel = ThreadSafePikaChannel(impl_channel, self)
-
- # Link implementation channel with our proxy channel
- impl_channel._set_cookie(channel)
-
- channel_opened_future.result()
- return channel
-
- def add_timeout(self, timeout, callback):
- return self._execute_task(self._impl.add_timeout, timeout, callback)
-
- def remove_timeout(self, timeout_id):
- return self._execute_task(self._impl.remove_timeout, timeout_id)
-
- @property
- def is_closed(self):
- return self._impl.is_closed
-
- @property
- def is_closing(self):
- return self._impl.is_closing
-
- @property
- def is_open(self):
- return self._impl.is_open
-
-
-class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
-
- def __init__(self, channel_impl, connection):
- self._impl = channel_impl
- self._connection = connection
-
- self._delivery_confirmation = False
-
- self._message_returned = False
- self._current_future = None
-
- self._evt_closed = threading.Event()
-
- self.add_on_close_callback(self._on_channel_close)
-
- def _execute_task(self, func, *args, **kwargs):
- return self._connection._execute_task(func, *args, **kwargs)
-
- def _on_channel_close(self, channel, reply_code, reply_text):
- self._evt_closed.set()
-
- if self._current_future:
- self._current_future.set_exception(
- pika_exceptions.ChannelClosed(reply_code, reply_text))
-
- def _on_message_confirmation(self, frame):
- self._current_future.set_result(frame)
-
- def add_on_close_callback(self, callback):
- self._execute_task(self._impl.add_on_close_callback, callback)
-
- def add_on_cancel_callback(self, callback):
- self._execute_task(self._impl.add_on_cancel_callback, callback)
-
- def __int__(self):
- return self.channel_number
-
- @property
- def channel_number(self):
- return self._impl.channel_number
-
- @property
- def is_closed(self):
- return self._impl.is_closed
-
- @property
- def is_closing(self):
- return self._impl.is_closing
-
- @property
- def is_open(self):
- return self._impl.is_open
-
- def close(self, reply_code=0, reply_text="Normal Shutdown"):
- self._impl.close(reply_code=reply_code, reply_text=reply_text)
- self._evt_closed.wait()
-
- def _check_called_not_from_event_loop(self):
- self._connection._check_called_not_from_event_loop()
-
- def flow(self, active):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(
- self._impl.flow, callback=self._current_future.set_result,
- active=active
- )
-
- return self._current_future.result()
-
- def basic_consume(self, # pylint: disable=R0913
- consumer_callback,
- queue,
- no_ack=False,
- exclusive=False,
- consumer_tag=None,
- arguments=None):
-
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(
- self._impl.add_callback, self._current_future.set_result,
- replies=[pika_spec.Basic.ConsumeOk], one_shot=True
- )
-
- self._impl.add_callback(self._current_future.set_result,
- replies=[pika_spec.Basic.ConsumeOk],
- one_shot=True)
- tag = self._execute_task(
- self._impl.basic_consume,
- consumer_callback=consumer_callback,
- queue=queue,
- no_ack=no_ack,
- exclusive=exclusive,
- consumer_tag=consumer_tag,
- arguments=arguments
- )
-
- self._current_future.result()
- return tag
-
- def basic_cancel(self, consumer_tag):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(
- self._impl.basic_cancel,
- callback=self._current_future.set_result,
- consumer_tag=consumer_tag,
- nowait=False)
- self._current_future.result()
-
- def basic_ack(self, delivery_tag=0, multiple=False):
- return self._execute_task(
- self._impl.basic_ack, delivery_tag=delivery_tag, multiple=multiple)
-
- def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
- return self._execute_task(
- self._impl.basic_nack, delivery_tag=delivery_tag,
- multiple=multiple, requeue=requeue
- )
-
- def publish(self, exchange, routing_key, body, # pylint: disable=R0913
- properties=None, mandatory=False, immediate=False):
-
- if self._delivery_confirmation:
- self._check_called_not_from_event_loop()
-
- # In publisher-acknowledgments mode
- self._message_returned = False
- self._current_future = futurist.Future()
-
- self._execute_task(self._impl.basic_publish,
- exchange=exchange,
- routing_key=routing_key,
- body=body,
- properties=properties,
- mandatory=mandatory,
- immediate=immediate)
-
- conf_method = self._current_future.result().method
-
- if isinstance(conf_method, pika_spec.Basic.Nack):
- raise pika_exceptions.NackError((None,))
- else:
- assert isinstance(conf_method, pika_spec.Basic.Ack), (
- conf_method)
-
- if self._message_returned:
- raise pika_exceptions.UnroutableError((None,))
- else:
- # In non-publisher-acknowledgments mode
- self._execute_task(self._impl.basic_publish,
- exchange=exchange,
- routing_key=routing_key,
- body=body,
- properties=properties,
- mandatory=mandatory,
- immediate=immediate)
-
- def basic_qos(self, prefetch_size=0, prefetch_count=0, all_channels=False):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.basic_qos,
- callback=self._current_future.set_result,
- prefetch_size=prefetch_size,
- prefetch_count=prefetch_count,
- all_channels=all_channels)
- self._current_future.result()
-
- def basic_recover(self, requeue=False):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(
- self._impl.basic_recover,
- callback=lambda: self._current_future.set_result(None),
- requeue=requeue
- )
- self._current_future.result()
-
- def basic_reject(self, delivery_tag=None, requeue=True):
- self._execute_task(self._impl.basic_reject,
- delivery_tag=delivery_tag,
- requeue=requeue)
-
- def _on_message_returned(self, *args, **kwargs):
- self._message_returned = True
-
- def confirm_delivery(self):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.add_callback,
- callback=self._current_future.set_result,
- replies=[pika_spec.Confirm.SelectOk],
- one_shot=True)
- self._execute_task(self._impl.confirm_delivery,
- callback=self._on_message_confirmation,
- nowait=False)
- self._current_future.result()
-
- self._delivery_confirmation = True
- self._execute_task(self._impl.add_on_return_callback,
- self._on_message_returned)
-
- def exchange_declare(self, exchange=None, # pylint: disable=R0913
- exchange_type='direct', passive=False, durable=False,
- auto_delete=False, internal=False,
- arguments=None, **kwargs):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.exchange_declare,
- callback=self._current_future.set_result,
- exchange=exchange,
- exchange_type=exchange_type,
- passive=passive,
- durable=durable,
- auto_delete=auto_delete,
- internal=internal,
- nowait=False,
- arguments=arguments,
- type=kwargs["type"] if kwargs else None)
-
- return self._current_future.result()
-
- def exchange_delete(self, exchange=None, if_unused=False):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.exchange_delete,
- callback=self._current_future.set_result,
- exchange=exchange,
- if_unused=if_unused,
- nowait=False)
-
- return self._current_future.result()
-
- def exchange_bind(self, destination=None, source=None, routing_key='',
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.exchange_bind,
- callback=self._current_future.set_result,
- destination=destination,
- source=source,
- routing_key=routing_key,
- nowait=False,
- arguments=arguments)
-
- return self._current_future.result()
-
- def exchange_unbind(self, destination=None, source=None, routing_key='',
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.exchange_unbind,
- callback=self._current_future.set_result,
- destination=destination,
- source=source,
- routing_key=routing_key,
- nowait=False,
- arguments=arguments)
-
- return self._current_future.result()
-
- def queue_declare(self, queue='', passive=False, durable=False,
- exclusive=False, auto_delete=False,
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_declare,
- callback=self._current_future.set_result,
- queue=queue,
- passive=passive,
- durable=durable,
- exclusive=exclusive,
- auto_delete=auto_delete,
- nowait=False,
- arguments=arguments)
-
- return self._current_future.result()
-
- def queue_delete(self, queue='', if_unused=False, if_empty=False):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_delete,
- callback=self._current_future.set_result,
- queue=queue,
- if_unused=if_unused,
- if_empty=if_empty,
- nowait=False)
-
- return self._current_future.result()
-
- def queue_purge(self, queue=''):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_purge,
- callback=self._current_future.set_result,
- queue=queue,
- nowait=False)
- return self._current_future.result()
-
- def queue_bind(self, queue, exchange, routing_key=None,
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_bind,
- callback=self._current_future.set_result,
- queue=queue,
- exchange=exchange,
- routing_key=routing_key,
- nowait=False,
- arguments=arguments)
- return self._current_future.result()
-
- def queue_unbind(self, queue='', exchange=None, routing_key=None,
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_unbind,
- callback=self._current_future.set_result,
- queue=queue,
- exchange=exchange,
- routing_key=routing_key,
- arguments=arguments)
- return self._current_future.result()
diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py b/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
deleted file mode 100644
index a78c55e..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
+++ /dev/null
@@ -1,307 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import random
-import socket
-import threading
-import time
-
-from oslo_config import cfg
-import pika
-from pika import credentials as pika_credentials
-
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_connection
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-
-LOG = logging.getLogger(__name__)
-
-# constant for setting tcp_user_timeout socket option
-# (it should be defined in 'select' module of standard library in future)
-TCP_USER_TIMEOUT = 18
-
-# constants for creating connection statistics
-HOST_CONNECTION_LAST_TRY_TIME = "last_try_time"
-HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time"
-
-pika_opts = [
- cfg.IntOpt('channel_max',
- help='Maximum number of channels to allow'),
- cfg.IntOpt('frame_max',
- help='The maximum byte size for an AMQP frame'),
- cfg.IntOpt('heartbeat_interval', default=3,
- help="How often to send heartbeats for consumer's connections"),
- cfg.BoolOpt('ssl',
- help='Enable SSL'),
- cfg.DictOpt('ssl_options',
- help='Arguments passed to ssl.wrap_socket'),
- cfg.FloatOpt('socket_timeout', default=0.25,
- help="Set socket timeout in seconds for connection's socket"),
- cfg.FloatOpt('tcp_user_timeout', default=0.25,
- help="Set TCP_USER_TIMEOUT in seconds for connection's "
- "socket"),
- cfg.FloatOpt('host_connection_reconnect_delay', default=0.25,
- help="Set delay for reconnection to some host which has "
- "connection error"),
- cfg.StrOpt('connection_factory', default="single",
- choices=["new", "single", "read_write"],
- help='Connection factory implementation')
-]
-
-
-class PikaConnectionFactory(object):
-
- def __init__(self, url, conf):
- self._url = url
- self._conf = conf
-
- self._connection_lock = threading.RLock()
-
- if not url.hosts:
- raise ValueError("You should provide at least one RabbitMQ host")
-
- # initializing connection parameters for configured RabbitMQ hosts
- self._common_pika_params = {
- 'virtual_host': url.virtual_host,
- 'channel_max': conf.oslo_messaging_pika.channel_max,
- 'frame_max': conf.oslo_messaging_pika.frame_max,
- 'ssl': conf.oslo_messaging_pika.ssl,
- 'ssl_options': conf.oslo_messaging_pika.ssl_options,
- 'socket_timeout': conf.oslo_messaging_pika.socket_timeout
- }
-
- self._host_list = url.hosts
- self._heartbeat_interval = conf.oslo_messaging_pika.heartbeat_interval
- self._host_connection_reconnect_delay = (
- conf.oslo_messaging_pika.host_connection_reconnect_delay
- )
- self._tcp_user_timeout = conf.oslo_messaging_pika.tcp_user_timeout
-
- self._connection_host_status = {}
-
- self._cur_connection_host_num = random.randint(
- 0, len(url.hosts) - 1
- )
-
- def cleanup(self):
- pass
-
- def create_connection(self, for_listening=False):
- """Create and return connection to any available host.
-
- :return: created connection
- :raise: ConnectionException if all hosts are not reachable
- """
-
- with self._connection_lock:
-
- host_count = len(self._host_list)
- connection_attempts = host_count
-
- while connection_attempts > 0:
- self._cur_connection_host_num += 1
- self._cur_connection_host_num %= host_count
- try:
- return self._create_host_connection(
- self._cur_connection_host_num, for_listening
- )
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
- LOG.warning("Can't establish connection to host. %s", e)
- except pika_drv_exc.HostConnectionNotAllowedException as e:
- LOG.warning("Connection to host is not allowed. %s", e)
-
- connection_attempts -= 1
-
- raise pika_drv_exc.EstablishConnectionException(
- "Can not establish connection to any configured RabbitMQ "
- "host: " + str(self._host_list)
- )
-
- def _set_tcp_user_timeout(self, s):
- if not self._tcp_user_timeout:
- return
- try:
- s.setsockopt(
- socket.IPPROTO_TCP, TCP_USER_TIMEOUT,
- int(self._tcp_user_timeout * 1000)
- )
- except socket.error:
- LOG.warning(
- "Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT."
- )
-
- def _create_host_connection(self, host_index, for_listening):
- """Create new connection to host #host_index
-
- :param host_index: Integer, number of host for connection establishing
- :param for_listening: Boolean, creates connection for listening
- if True
- :return: New connection
- """
- host = self._host_list[host_index]
-
- cur_time = time.time()
-
- host_connection_status = self._connection_host_status.get(host)
-
- if host_connection_status is None:
- host_connection_status = {
- HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0,
- HOST_CONNECTION_LAST_TRY_TIME: 0
- }
- self._connection_host_status[host] = host_connection_status
-
- last_success_time = host_connection_status[
- HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
- ]
- last_time = host_connection_status[
- HOST_CONNECTION_LAST_TRY_TIME
- ]
-
- # raise HostConnectionNotAllowedException if we tried to establish
- # connection in last 'host_connection_reconnect_delay' and got
- # failure
- if (last_time != last_success_time and
- cur_time - last_time <
- self._host_connection_reconnect_delay):
- raise pika_drv_exc.HostConnectionNotAllowedException(
- "Connection to host #{} is not allowed now because of "
- "previous failure".format(host_index)
- )
-
- try:
- connection = self._do_create_host_connection(
- host, for_listening
- )
- self._connection_host_status[host][
- HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
- ] = cur_time
-
- return connection
- finally:
- self._connection_host_status[host][
- HOST_CONNECTION_LAST_TRY_TIME
- ] = cur_time
-
- def _do_create_host_connection(self, host, for_listening):
- connection_params = pika.ConnectionParameters(
- host=host.hostname,
- port=host.port,
- credentials=pika_credentials.PlainCredentials(
- host.username, host.password
- ),
- heartbeat_interval=(
- self._heartbeat_interval if for_listening else None
- ),
- **self._common_pika_params
- )
- if for_listening:
- connection = pika_connection.ThreadSafePikaConnection(
- parameters=connection_params
- )
- else:
- connection = pika.BlockingConnection(
- parameters=connection_params
- )
- connection.params = connection_params
-
- self._set_tcp_user_timeout(connection._impl.socket)
- return connection
-
-
-class NotClosableConnection(object):
- def __init__(self, connection):
- self._connection = connection
-
- def __getattr__(self, item):
- return getattr(self._connection, item)
-
- def close(self):
- pass
-
-
-class SinglePikaConnectionFactory(PikaConnectionFactory):
- def __init__(self, url, conf):
- super(SinglePikaConnectionFactory, self).__init__(url, conf)
- self._connection = None
-
- def create_connection(self, for_listening=False):
- with self._connection_lock:
- if self._connection is None or not self._connection.is_open:
- self._connection = (
- super(SinglePikaConnectionFactory, self).create_connection(
- True
- )
- )
- return NotClosableConnection(self._connection)
-
- def cleanup(self):
- with self._connection_lock:
- if self._connection is not None and self._connection.is_open:
- try:
- self._connection.close()
- except Exception:
- LOG.warning(
- "Unexpected exception during connection closing",
- exc_info=True
- )
- self._connection = None
-
-
-class ReadWritePikaConnectionFactory(PikaConnectionFactory):
- def __init__(self, url, conf):
- super(ReadWritePikaConnectionFactory, self).__init__(url, conf)
- self._read_connection = None
- self._write_connection = None
-
- def create_connection(self, for_listening=False):
- with self._connection_lock:
- if for_listening:
- if (self._read_connection is None or
- not self._read_connection.is_open):
- self._read_connection = super(
- ReadWritePikaConnectionFactory, self
- ).create_connection(True)
- return NotClosableConnection(self._read_connection)
- else:
- if (self._write_connection is None or
- not self._write_connection.is_open):
- self._write_connection = super(
- ReadWritePikaConnectionFactory, self
- ).create_connection(True)
- return NotClosableConnection(self._write_connection)
-
- def cleanup(self):
- with self._connection_lock:
- if (self._read_connection is not None and
- self._read_connection.is_open):
- try:
- self._read_connection.close()
- except Exception:
- LOG.warning(
- "Unexpected exception during connection closing",
- exc_info=True
- )
- self._read_connection = None
-
- if (self._write_connection is not None and
- self._write_connection.is_open):
- try:
- self._write_connection.close()
- except Exception:
- LOG.warning(
- "Unexpected exception during connection closing",
- exc_info=True
- )
- self._write_connection = None
diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py
deleted file mode 100644
index 157c499..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_engine.py
+++ /dev/null
@@ -1,303 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import os
-import threading
-import uuid
-
-from oslo_utils import eventletutils
-import pika_pool
-from stevedore import driver
-
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-
-LOG = logging.getLogger(__name__)
-
-
-class _PooledConnectionWithConfirmations(pika_pool.Connection):
- """Derived from 'pika_pool.Connection' and extends its logic - adds
- 'confirm_delivery' call after channel creation to enable delivery
- confirmation for channel
- """
- @property
- def channel(self):
- if self.fairy.channel is None:
- self.fairy.channel = self.fairy.cxn.channel()
- self.fairy.channel.confirm_delivery()
- return self.fairy.channel
-
-
-class PikaEngine(object):
- """Used for shared functionality between other pika driver modules, like
- connection factory, connection pools, processing and holding configuration,
- etc.
- """
-
- def __init__(self, conf, url, default_exchange=None,
- allowed_remote_exmods=None):
- self.conf = conf
- self.url = url
-
- self._connection_factory_type = (
- self.conf.oslo_messaging_pika.connection_factory
- )
-
- self._connection_factory = None
- self._connection_without_confirmation_pool = None
- self._connection_with_confirmation_pool = None
- self._pid = None
- self._init_lock = threading.Lock()
-
- self.host_connection_reconnect_delay = (
- conf.oslo_messaging_pika.host_connection_reconnect_delay
- )
-
- # processing rpc options
- self.default_rpc_exchange = (
- conf.oslo_messaging_pika.default_rpc_exchange
- )
- self.rpc_reply_exchange = (
- conf.oslo_messaging_pika.rpc_reply_exchange
- )
-
- self.allowed_remote_exmods = [pika_drv_cmns.EXCEPTIONS_MODULE]
- if allowed_remote_exmods:
- self.allowed_remote_exmods.extend(allowed_remote_exmods)
-
- self.rpc_listener_prefetch_count = (
- conf.oslo_messaging_pika.rpc_listener_prefetch_count
- )
-
- self.default_rpc_retry_attempts = (
- conf.oslo_messaging_pika.default_rpc_retry_attempts
- )
-
- self.rpc_retry_delay = (
- conf.oslo_messaging_pika.rpc_retry_delay
- )
- if self.rpc_retry_delay < 0:
- raise ValueError("rpc_retry_delay should be non-negative integer")
-
- self.rpc_reply_listener_prefetch_count = (
- conf.oslo_messaging_pika.rpc_listener_prefetch_count
- )
-
- self.rpc_reply_retry_attempts = (
- conf.oslo_messaging_pika.rpc_reply_retry_attempts
- )
- self.rpc_reply_retry_delay = (
- conf.oslo_messaging_pika.rpc_reply_retry_delay
- )
- if self.rpc_reply_retry_delay < 0:
- raise ValueError("rpc_reply_retry_delay should be non-negative "
- "integer")
-
- self.rpc_queue_expiration = (
- self.conf.oslo_messaging_pika.rpc_queue_expiration
- )
-
- # processing notification options
- self.default_notification_exchange = (
- conf.oslo_messaging_pika.default_notification_exchange
- )
-
- self.notification_persistence = (
- conf.oslo_messaging_pika.notification_persistence
- )
-
- self.notification_listener_prefetch_count = (
- conf.oslo_messaging_pika.notification_listener_prefetch_count
- )
-
- self.default_notification_retry_attempts = (
- conf.oslo_messaging_pika.default_notification_retry_attempts
- )
- if self.default_notification_retry_attempts is None:
- raise ValueError("default_notification_retry_attempts should be "
- "an integer")
- self.notification_retry_delay = (
- conf.oslo_messaging_pika.notification_retry_delay
- )
- if (self.notification_retry_delay is None or
- self.notification_retry_delay < 0):
- raise ValueError("notification_retry_delay should be non-negative "
- "integer")
-
- self.default_content_type = (
- 'application/' + conf.oslo_messaging_pika.default_serializer_type
- )
-
- def _init_if_needed(self):
- cur_pid = os.getpid()
-
- if self._pid == cur_pid:
- return
-
- with self._init_lock:
- if self._pid == cur_pid:
- return
-
- if self._pid:
- LOG.warning("New pid is detected. Old: %s, new: %s. "
- "Cleaning up...", self._pid, cur_pid)
-
- # Note(dukhlov): we need to force select poller usage in case
- # when 'thread' module is monkey patched becase current
- # eventlet implementation does not support patching of
- # poll/epoll/kqueue
- if eventletutils.is_monkey_patched("thread"):
- from pika.adapters import select_connection
- select_connection.SELECT_TYPE = "select"
-
- mgr = driver.DriverManager(
- 'oslo.messaging.pika.connection_factory',
- self._connection_factory_type
- )
-
- self._connection_factory = mgr.driver(self.url, self.conf)
-
- # initializing 2 connection pools: 1st for connections without
- # confirmations, 2nd - with confirmations
- self._connection_without_confirmation_pool = pika_pool.QueuedPool(
- create=self.create_connection,
- max_size=self.conf.oslo_messaging_pika.pool_max_size,
- max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
- timeout=self.conf.oslo_messaging_pika.pool_timeout,
- recycle=self.conf.oslo_messaging_pika.pool_recycle,
- stale=self.conf.oslo_messaging_pika.pool_stale,
- )
-
- self._connection_with_confirmation_pool = pika_pool.QueuedPool(
- create=self.create_connection,
- max_size=self.conf.oslo_messaging_pika.pool_max_size,
- max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
- timeout=self.conf.oslo_messaging_pika.pool_timeout,
- recycle=self.conf.oslo_messaging_pika.pool_recycle,
- stale=self.conf.oslo_messaging_pika.pool_stale,
- )
-
- self._connection_with_confirmation_pool.Connection = (
- _PooledConnectionWithConfirmations
- )
-
- self._pid = cur_pid
-
- def create_connection(self, for_listening=False):
- self._init_if_needed()
- return self._connection_factory.create_connection(for_listening)
-
- @property
- def connection_without_confirmation_pool(self):
- self._init_if_needed()
- return self._connection_without_confirmation_pool
-
- @property
- def connection_with_confirmation_pool(self):
- self._init_if_needed()
- return self._connection_with_confirmation_pool
-
- def cleanup(self):
- if self._connection_factory:
- self._connection_factory.cleanup()
-
- def declare_exchange_by_channel(self, channel, exchange, exchange_type,
- durable):
- """Declare exchange using already created channel, if they don't exist
-
- :param channel: Channel for communication with RabbitMQ
- :param exchange: String, RabbitMQ exchange name
- :param exchange_type: String ('direct', 'topic' or 'fanout')
- exchange type for exchange to be declared
- :param durable: Boolean, creates durable exchange if true
- """
- try:
- channel.exchange_declare(
- exchange, exchange_type, auto_delete=True, durable=durable
- )
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
- raise pika_drv_exc.ConnectionException(
- "Connectivity problem detected during declaring exchange: "
- "exchange:{}, exchange_type: {}, durable: {}. {}".format(
- exchange, exchange_type, durable, str(e)
- )
- )
-
- def declare_queue_binding_by_channel(self, channel, exchange, queue,
- routing_key, exchange_type,
- queue_expiration, durable):
- """Declare exchange, queue and bind them using already created
- channel, if they don't exist
-
- :param channel: Channel for communication with RabbitMQ
- :param exchange: String, RabbitMQ exchange name
- :param queue: Sting, RabbitMQ queue name
- :param routing_key: Sting, RabbitMQ routing key for queue binding
- :param exchange_type: String ('direct', 'topic' or 'fanout')
- exchange type for exchange to be declared
- :param queue_expiration: Integer, time in seconds which queue will
- remain existing in RabbitMQ when there no consumers connected
- :param durable: Boolean, creates durable exchange and queue if true
- """
- try:
- channel.exchange_declare(
- exchange, exchange_type, auto_delete=True, durable=durable
- )
- arguments = {}
-
- if queue_expiration > 0:
- arguments['x-expires'] = queue_expiration * 1000
-
- channel.queue_declare(queue, durable=durable, arguments=arguments)
-
- channel.queue_bind(queue, exchange, routing_key)
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
- raise pika_drv_exc.ConnectionException(
- "Connectivity problem detected during declaring queue "
- "binding: exchange:{}, queue: {}, routing_key: {}, "
- "exchange_type: {}, queue_expiration: {}, "
- "durable: {}. {}".format(
- exchange, queue, routing_key, exchange_type,
- queue_expiration, durable, str(e)
- )
- )
-
- def get_rpc_exchange_name(self, exchange):
- """Returns RabbitMQ exchange name for given rpc request
-
- :param exchange: String, oslo.messaging target's exchange
-
- :return: String, RabbitMQ exchange name
- """
- return exchange or self.default_rpc_exchange
-
- @staticmethod
- def get_rpc_queue_name(topic, server, no_ack, worker=False):
- """Returns RabbitMQ queue name for given rpc request
-
- :param topic: String, oslo.messaging target's topic
- :param server: String, oslo.messaging target's server
- :param no_ack: Boolean, use message delivery with acknowledges or not
- :param worker: Boolean, use queue by single worker only or not
-
- :return: String, RabbitMQ queue name
- """
- queue_parts = ["no_ack" if no_ack else "with_ack", topic]
- if server is not None:
- queue_parts.append(server)
- if worker:
- queue_parts.append("worker")
- queue_parts.append(uuid.uuid4().hex)
- queue = '.'.join(queue_parts)
- return queue
diff --git a/oslo_messaging/_drivers/pika_driver/pika_exceptions.py b/oslo_messaging/_drivers/pika_driver/pika_exceptions.py
deleted file mode 100644
index c32d7e4..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_exceptions.py
+++ /dev/null
@@ -1,68 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging import exceptions
-
-
-class ExchangeNotFoundException(exceptions.MessageDeliveryFailure):
- """Is raised if specified exchange is not found in RabbitMQ."""
- pass
-
-
-class MessageRejectedException(exceptions.MessageDeliveryFailure):
- """Is raised if message which you are trying to send was nacked by RabbitMQ
- it may happen if RabbitMQ is not able to process message
- """
- pass
-
-
-class RoutingException(exceptions.MessageDeliveryFailure):
- """Is raised if message can not be delivered to any queue. Usually it means
- that any queue is not binded to given exchange with given routing key.
- Raised if 'mandatory' flag specified only
- """
- pass
-
-
-class ConnectionException(exceptions.MessagingException):
- """Is raised if some operation can not be performed due to connectivity
- problem
- """
- pass
-
-
-class TimeoutConnectionException(ConnectionException):
- """Is raised if socket timeout was expired during network interaction"""
- pass
-
-
-class EstablishConnectionException(ConnectionException):
- """Is raised if we have some problem during establishing connection
- procedure
- """
- pass
-
-
-class HostConnectionNotAllowedException(EstablishConnectionException):
- """Is raised in case of try to establish connection to temporary
- not allowed host (because of reconnection policy for example)
- """
- pass
-
-
-class UnsupportedDriverVersion(exceptions.MessagingException):
- """Is raised when message is received but was sent by different,
- not supported driver version
- """
- pass
diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py
deleted file mode 100644
index 1942fcc..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_listener.py
+++ /dev/null
@@ -1,123 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-import uuid
-
-from concurrent import futures
-from oslo_log import log as logging
-
-from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
-
-LOG = logging.getLogger(__name__)
-
-
-class RpcReplyPikaListener(object):
- """Provide functionality for listening RPC replies. Create and handle
- reply poller and coroutine for performing polling job
- """
-
- def __init__(self, pika_engine):
- super(RpcReplyPikaListener, self).__init__()
- self._pika_engine = pika_engine
-
- # preparing poller for listening replies
- self._reply_queue = None
-
- self._reply_poller = None
- self._reply_waiting_futures = {}
-
- self._reply_consumer_initialized = False
- self._reply_consumer_initialization_lock = threading.Lock()
- self._shutdown = False
-
- def get_reply_qname(self):
- """As result return reply queue name, shared for whole process,
- but before this check is RPC listener initialized or not and perform
- initialization if needed
-
- :return: String, queue name which hould be used for reply sending
- """
- if self._reply_consumer_initialized:
- return self._reply_queue
-
- with self._reply_consumer_initialization_lock:
- if self._reply_consumer_initialized:
- return self._reply_queue
-
- # generate reply queue name if needed
- if self._reply_queue is None:
- self._reply_queue = "reply.{}.{}.{}".format(
- self._pika_engine.conf.project,
- self._pika_engine.conf.prog, uuid.uuid4().hex
- )
-
- # initialize reply poller if needed
- if self._reply_poller is None:
- self._reply_poller = pika_drv_poller.RpcReplyPikaPoller(
- self._pika_engine, self._pika_engine.rpc_reply_exchange,
- self._reply_queue, 1, None,
- self._pika_engine.rpc_reply_listener_prefetch_count
- )
-
- self._reply_poller.start(self._on_incoming)
- self._reply_consumer_initialized = True
-
- return self._reply_queue
-
- def _on_incoming(self, incoming):
- """Reply polling job. Poll replies in infinite loop and notify
- registered features
- """
- for message in incoming:
- try:
- message.acknowledge()
- future = self._reply_waiting_futures.pop(
- message.msg_id, None
- )
- if future is not None:
- future.set_result(message)
- except Exception:
- LOG.exception("Unexpected exception during processing"
- "reply message")
-
- def register_reply_waiter(self, msg_id):
- """Register reply waiter. Should be called before message sending to
- the server
- :param msg_id: String, message_id of expected reply
- :return future: Future, container for expected reply to be returned
- over
- """
- future = futures.Future()
- self._reply_waiting_futures[msg_id] = future
- return future
-
- def unregister_reply_waiter(self, msg_id):
- """Unregister reply waiter. Should be called if client has not got
- reply and doesn't want to continue waiting (if timeout_expired for
- example)
- :param msg_id:
- """
- self._reply_waiting_futures.pop(msg_id, None)
-
- def cleanup(self):
- """Stop replies consuming and cleanup resources"""
- self._shutdown = True
-
- if self._reply_poller:
- self._reply_poller.stop()
- self._reply_poller.cleanup()
- self._reply_poller = None
-
- self._reply_queue = None
diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py
deleted file mode 100644
index 2f79f05..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_message.py
+++ /dev/null
@@ -1,618 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import socket
-import time
-import traceback
-import uuid
-
-from concurrent import futures
-from oslo_log import log as logging
-from oslo_utils import importutils
-from oslo_utils import timeutils
-from pika import exceptions as pika_exceptions
-from pika import spec as pika_spec
-import pika_pool
-import six
-import tenacity
-
-
-import oslo_messaging
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-from oslo_messaging import _utils as utils
-from oslo_messaging import exceptions
-
-
-LOG = logging.getLogger(__name__)
-
-_VERSION_HEADER = "version"
-_VERSION = "1.0"
-
-
-class RemoteExceptionMixin(object):
- """Used for constructing dynamic exception type during deserialization of
- remote exception. It defines unified '__init__' method signature and
- exception message format
- """
- def __init__(self, module, clazz, message, trace):
- """Store serialized data
- :param module: String, module name for importing original exception
- class of serialized remote exception
- :param clazz: String, original class name of serialized remote
- exception
- :param message: String, original message of serialized remote
- exception
- :param trace: String, original trace of serialized remote exception
- """
- self.module = module
- self.clazz = clazz
- self.message = message
- self.trace = trace
-
- self._str_msgs = message + "\n" + "\n".join(trace)
-
- def __str__(self):
- return self._str_msgs
-
-
-class PikaIncomingMessage(base.IncomingMessage):
- """Driver friendly adapter for received message. Extract message
- information from RabbitMQ message and provide access to it
- """
-
- def __init__(self, pika_engine, channel, method, properties, body):
- """Parse RabbitMQ message
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param channel: Channel, RabbitMQ channel which was used for
- this message delivery, used for sending ack back.
- If None - ack is not required
- :param method: Method, RabbitMQ message method
- :param properties: Properties, RabbitMQ message properties
- :param body: Bytes, RabbitMQ message body
- """
- headers = getattr(properties, "headers", {})
- version = headers.get(_VERSION_HEADER, None)
- if not utils.version_is_compatible(version, _VERSION):
- raise pika_drv_exc.UnsupportedDriverVersion(
- "Message's version: {} is not compatible with driver version: "
- "{}".format(version, _VERSION))
-
- self._pika_engine = pika_engine
- self._channel = channel
- self._delivery_tag = method.delivery_tag
-
- self._version = version
-
- self._content_type = properties.content_type
- self.unique_id = properties.message_id
-
- self.expiration_time = (
- None if properties.expiration is None else
- time.time() + float(properties.expiration) / 1000
- )
-
- try:
- serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[self._content_type]
- except KeyError:
- raise NotImplementedError(
- "Content-type['{}'] is not supported.".format(
- self._content_type
- )
- )
-
- message_dict = serializer.load_from_bytes(body)
-
- context_dict = {}
-
- for key in list(message_dict.keys()):
- key = six.text_type(key)
- if key.startswith('_$_'):
- value = message_dict.pop(key)
- context_dict[key[3:]] = value
-
- super(PikaIncomingMessage, self).__init__(context_dict, message_dict)
-
- def need_ack(self):
- return self._channel is not None
-
- def acknowledge(self):
- """Ack the message. Should be called by message processing logic when
- it considered as consumed (means that we don't need redelivery of this
- message anymore)
- """
- if self.need_ack():
- self._channel.basic_ack(delivery_tag=self._delivery_tag)
-
- def requeue(self):
- """Rollback the message. Should be called by message processing logic
- when it can not process the message right now and should be redelivered
- later if it is possible
- """
- if self.need_ack():
- return self._channel.basic_nack(delivery_tag=self._delivery_tag,
- requeue=True)
-
-
-class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
- """PikaIncomingMessage implementation for RPC messages. It expects
- extra RPC related fields in message body (msg_id and reply_q). Also 'reply'
- method added to allow consumer to send RPC reply back to the RPC client
- """
-
- def __init__(self, pika_engine, channel, method, properties, body):
- """Defines default values of msg_id and reply_q fields and just call
- super.__init__ method
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param channel: Channel, RabbitMQ channel which was used for
- this message delivery, used for sending ack back.
- If None - ack is not required
- :param method: Method, RabbitMQ message method
- :param properties: Properties, RabbitMQ message properties
- :param body: Bytes, RabbitMQ message body
- """
- super(RpcPikaIncomingMessage, self).__init__(
- pika_engine, channel, method, properties, body
- )
- self.reply_q = properties.reply_to
- self.msg_id = properties.correlation_id
-
- def reply(self, reply=None, failure=None):
- """Send back reply to the RPC client
- :param reply: Dictionary, reply. In case of exception should be None
- :param failure: Tuple, should be a sys.exc_info() tuple.
- Should be None if RPC request was successfully processed.
-
- :return RpcReplyPikaIncomingMessage: message with reply
- """
-
- if self.reply_q is None:
- return
-
- reply_outgoing_message = RpcReplyPikaOutgoingMessage(
- self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
- content_type=self._content_type,
- )
-
- def on_exception(ex):
- if isinstance(ex, pika_drv_exc.ConnectionException):
- LOG.warning(
- "Connectivity related problem during reply sending. %s",
- ex
- )
- return True
- else:
- return False
-
- if self._pika_engine.rpc_reply_retry_attempts:
- retrier = tenacity.retry(
- stop=(
- tenacity.stop_never
- if self._pika_engine.rpc_reply_retry_attempts == -1 else
- tenacity.stop_after_attempt(
- self._pika_engine.rpc_reply_retry_attempts
- )
- ),
- retry=tenacity.retry_if_exception(on_exception),
- wait=tenacity.wait_fixed(
- self._pika_engine.rpc_reply_retry_delay
- )
- )
- else:
- retrier = None
-
- try:
- timeout = (None if self.expiration_time is None else
- max(self.expiration_time - time.time(), 0))
- with timeutils.StopWatch(duration=timeout) as stopwatch:
- reply_outgoing_message.send(
- reply_q=self.reply_q,
- stopwatch=stopwatch,
- retrier=retrier
- )
- LOG.debug(
- "Message [id:'%s'] replied to '%s'.", self.msg_id, self.reply_q
- )
- except Exception:
- LOG.exception(
- "Message [id:'%s'] wasn't replied to : %s", self.msg_id,
- self.reply_q
- )
-
-
-class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
- """PikaIncomingMessage implementation for RPC reply messages. It expects
- extra RPC reply related fields in message body (result and failure).
- """
- def __init__(self, pika_engine, channel, method, properties, body):
- """Defines default values of result and failure fields, call
- super.__init__ method and then construct Exception object if failure is
- not None
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param channel: Channel, RabbitMQ channel which was used for
- this message delivery, used for sending ack back.
- If None - ack is not required
- :param method: Method, RabbitMQ message method
- :param properties: Properties, RabbitMQ message properties
- :param body: Bytes, RabbitMQ message body
- """
- super(RpcReplyPikaIncomingMessage, self).__init__(
- pika_engine, channel, method, properties, body
- )
-
- self.msg_id = properties.correlation_id
-
- self.result = self.message.get("s", None)
- self.failure = self.message.get("e", None)
-
- if self.failure is not None:
- trace = self.failure.get('t', [])
- message = self.failure.get('s', "")
- class_name = self.failure.get('c')
- module_name = self.failure.get('m')
-
- res_exc = None
-
- if module_name in pika_engine.allowed_remote_exmods:
- try:
- module = importutils.import_module(module_name)
- klass = getattr(module, class_name)
-
- ex_type = type(
- klass.__name__,
- (RemoteExceptionMixin, klass),
- {}
- )
-
- res_exc = ex_type(module_name, class_name, message, trace)
- except ImportError as e:
- LOG.warning(
- "Can not deserialize remote exception [module:%s, "
- "class:%s]. %s", module_name, class_name, e
- )
-
- # if we have not processed failure yet, use RemoteError class
- if res_exc is None:
- res_exc = oslo_messaging.RemoteError(
- class_name, message, trace
- )
- self.failure = res_exc
-
-
-class PikaOutgoingMessage(object):
- """Driver friendly adapter for sending message. Construct RabbitMQ message
- and send it
- """
-
- def __init__(self, pika_engine, message, context, content_type=None):
- """Parse RabbitMQ message
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param message: Dictionary, user's message fields
- :param context: Dictionary, request context's fields
- :param content_type: String, content-type header, defines serialization
- mechanism, if None default content-type from pika_engine is used
- """
-
- self._pika_engine = pika_engine
-
- self._content_type = (
- content_type if content_type is not None else
- self._pika_engine.default_content_type
- )
-
- try:
- self._serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[
- self._content_type
- ]
- except KeyError:
- raise NotImplementedError(
- "Content-type['{}'] is not supported.".format(
- self._content_type
- )
- )
-
- self.message = message
- self.context = context
-
- self.unique_id = uuid.uuid4().hex
-
- def _prepare_message_to_send(self):
- """Combine user's message fields an system fields (_unique_id,
- context's data etc)
- """
- msg = self.message.copy()
-
- if self.context:
- for key, value in self.context.items():
- key = six.text_type(key)
- msg['_$_' + key] = value
-
- props = pika_spec.BasicProperties(
- content_type=self._content_type,
- headers={_VERSION_HEADER: _VERSION},
- message_id=self.unique_id,
- )
- return msg, props
-
- @staticmethod
- def _publish(pool, exchange, routing_key, body, properties, mandatory,
- stopwatch):
- """Execute pika publish method using connection from connection pool
- Also this message catches all pika related exceptions and raise
- oslo.messaging specific exceptions
-
- :param pool: Pool, pika connection pool for connection choosing
- :param exchange: String, RabbitMQ exchange name for message sending
- :param routing_key: String, RabbitMQ routing key for message routing
- :param body: Bytes, RabbitMQ message payload
- :param properties: Properties, RabbitMQ message properties
- :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
- exception if it is not possible to deliver message to any queue)
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- """
- if stopwatch.expired():
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired."
- )
- try:
- timeout = stopwatch.leftover(return_none=True)
- with pool.acquire(timeout=timeout) as conn:
- if timeout is not None:
- properties.expiration = str(int(timeout * 1000))
- conn.channel.publish(
- exchange=exchange,
- routing_key=routing_key,
- body=body,
- properties=properties,
- mandatory=mandatory
- )
- except pika_exceptions.NackError as e:
- raise pika_drv_exc.MessageRejectedException(
- "Can not send message: [body: {}], properties: {}] to "
- "target [exchange: {}, routing_key: {}]. {}".format(
- body, properties, exchange, routing_key, str(e)
- )
- )
- except pika_exceptions.UnroutableError as e:
- raise pika_drv_exc.RoutingException(
- "Can not deliver message:[body:{}, properties: {}] to any "
- "queue using target: [exchange:{}, "
- "routing_key:{}]. {}".format(
- body, properties, exchange, routing_key, str(e)
- )
- )
- except pika_pool.Timeout as e:
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired. {}".format(str(e))
- )
- except pika_pool.Connection.connectivity_errors as e:
- if (isinstance(e, pika_exceptions.ChannelClosed)
- and e.args and e.args[0] == 404):
- raise pika_drv_exc.ExchangeNotFoundException(
- "Attempt to send message to not existing exchange "
- "detected, message: [body:{}, properties: {}], target: "
- "[exchange:{}, routing_key:{}]. {}".format(
- body, properties, exchange, routing_key, str(e)
- )
- )
-
- raise pika_drv_exc.ConnectionException(
- "Connectivity problem detected during sending the message: "
- "[body:{}, properties: {}] to target: [exchange:{}, "
- "routing_key:{}]. {}".format(
- body, properties, exchange, routing_key, str(e)
- )
- )
- except socket.timeout:
- raise pika_drv_exc.TimeoutConnectionException(
- "Socket timeout exceeded."
- )
-
- def _do_send(self, exchange, routing_key, msg_dict, msg_props,
- confirm=True, mandatory=True, persistent=False,
- stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None):
- """Send prepared message with configured retrying
-
- :param exchange: String, RabbitMQ exchange name for message sending
- :param routing_key: String, RabbitMQ routing key for message routing
- :param msg_dict: Dictionary, message payload
- :param msg_props: Properties, message properties
- :param confirm: Boolean, enable publisher confirmation if True
- :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
- exception if it is not possible to deliver message to any queue)
- :param persistent: Boolean, send persistent message if True, works only
- for routing into durable queues
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- :param retrier: tenacity.Retrying, configured retrier object for
- sending message, if None no retrying is performed
- """
- msg_props.delivery_mode = 2 if persistent else 1
-
- pool = (self._pika_engine.connection_with_confirmation_pool
- if confirm else
- self._pika_engine.connection_without_confirmation_pool)
-
- body = self._serializer.dump_as_bytes(msg_dict)
-
- LOG.debug(
- "Sending message:[body:%s; properties: %s] to target: "
- "[exchange:%s; routing_key:%s]", body, msg_props, exchange,
- routing_key
- )
-
- publish = (self._publish if retrier is None else
- retrier(self._publish))
-
- return publish(pool, exchange, routing_key, body, msg_props,
- mandatory, stopwatch)
-
- def send(self, exchange, routing_key='', confirm=True, mandatory=True,
- persistent=False, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
- retrier=None):
- """Send message with configured retrying
-
- :param exchange: String, RabbitMQ exchange name for message sending
- :param routing_key: String, RabbitMQ routing key for message routing
- :param confirm: Boolean, enable publisher confirmation if True
- :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
- exception if it is not possible to deliver message to any queue)
- :param persistent: Boolean, send persistent message if True, works only
- for routing into durable queues
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- :param retrier: tenacity.Retrying, configured retrier object for
- sending message, if None no retrying is performed
- """
- msg_dict, msg_props = self._prepare_message_to_send()
-
- return self._do_send(exchange, routing_key, msg_dict, msg_props,
- confirm, mandatory, persistent,
- stopwatch, retrier)
-
-
-class RpcPikaOutgoingMessage(PikaOutgoingMessage):
- """PikaOutgoingMessage implementation for RPC messages. It adds
- possibility to wait and receive RPC reply
- """
- def __init__(self, pika_engine, message, context, content_type=None):
- super(RpcPikaOutgoingMessage, self).__init__(
- pika_engine, message, context, content_type
- )
- self.msg_id = None
- self.reply_q = None
-
- def send(self, exchange, routing_key, reply_listener=None,
- stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None):
- """Send RPC message with configured retrying
-
- :param exchange: String, RabbitMQ exchange name for message sending
- :param routing_key: String, RabbitMQ routing key for message routing
- :param reply_listener: RpcReplyPikaListener, listener for waiting
- reply. If None - return immediately without reply waiting
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- :param retrier: tenacity.Retrying, configured retrier object for
- sending message, if None no retrying is performed
- """
- msg_dict, msg_props = self._prepare_message_to_send()
-
- if reply_listener:
- self.msg_id = uuid.uuid4().hex
- msg_props.correlation_id = self.msg_id
- LOG.debug('MSG_ID is %s', self.msg_id)
-
- self.reply_q = reply_listener.get_reply_qname()
- msg_props.reply_to = self.reply_q
-
- future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
-
- self._do_send(
- exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
- msg_props=msg_props, confirm=True, mandatory=True,
- persistent=False, stopwatch=stopwatch, retrier=retrier
- )
-
- try:
- return future.result(stopwatch.leftover(return_none=True))
- except BaseException as e:
- reply_listener.unregister_reply_waiter(self.msg_id)
- if isinstance(e, futures.TimeoutError):
- e = exceptions.MessagingTimeout()
- raise e
- else:
- self._do_send(
- exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
- msg_props=msg_props, confirm=True, mandatory=True,
- persistent=False, stopwatch=stopwatch, retrier=retrier
- )
-
-
-class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
- """PikaOutgoingMessage implementation for RPC reply messages. It sets
- correlation_id AMQP property to link this reply with response
- """
- def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
- content_type=None):
- """Initialize with reply information for sending
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param msg_id: String, msg_id of RPC request, which waits for reply
- :param reply: Dictionary, reply. In case of exception should be None
- :param failure_info: Tuple, should be a sys.exc_info() tuple.
- Should be None if RPC request was successfully processed.
- :param content_type: String, content-type header, defines serialization
- mechanism, if None default content-type from pika_engine is used
- """
- self.msg_id = msg_id
-
- if failure_info is not None:
- ex_class = failure_info[0]
- ex = failure_info[1]
- tb = traceback.format_exception(*failure_info)
- if issubclass(ex_class, RemoteExceptionMixin):
- failure_data = {
- 'c': ex.clazz,
- 'm': ex.module,
- 's': ex.message,
- 't': tb
- }
- else:
- failure_data = {
- 'c': six.text_type(ex_class.__name__),
- 'm': six.text_type(ex_class.__module__),
- 's': six.text_type(ex),
- 't': tb
- }
-
- msg = {'e': failure_data}
- else:
- msg = {'s': reply}
-
- super(RpcReplyPikaOutgoingMessage, self).__init__(
- pika_engine, msg, None, content_type
- )
-
- def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
- retrier=None):
- """Send RPC message with configured retrying
-
- :param reply_q: String, queue name for sending reply
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- :param retrier: tenacity.Retrying, configured retrier object for
- sending message, if None no retrying is performed
- """
-
- msg_dict, msg_props = self._prepare_message_to_send()
- msg_props.correlation_id = self.msg_id
-
- self._do_send(
- exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q,
- msg_dict=msg_dict, msg_props=msg_props, confirm=True,
- mandatory=True, persistent=False, stopwatch=stopwatch,
- retrier=retrier
- )
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
deleted file mode 100644
index 1c770a3..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ /dev/null
@@ -1,538 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-
-from oslo_log import log as logging
-from oslo_service import loopingcall
-
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
-
-LOG = logging.getLogger(__name__)
-
-
-class PikaPoller(base.Listener):
- """Provides user friendly functionality for RabbitMQ message consuming,
- handles low level connectivity problems and restore connection if some
- connectivity related problem detected
- """
-
- def __init__(self, pika_engine, batch_size, batch_timeout, prefetch_count,
- incoming_message_class):
- """Initialize required fields
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param batch_size: desired number of messages passed to
- single on_incoming_callback call
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_count: Integer, maximum count of unacknowledged
- messages which RabbitMQ broker sends to this consumer
- :param incoming_message_class: PikaIncomingMessage, wrapper for
- consumed RabbitMQ message
- """
- super(PikaPoller, self).__init__(batch_size, batch_timeout,
- prefetch_count)
- self._pika_engine = pika_engine
- self._incoming_message_class = incoming_message_class
-
- self._connection = None
- self._channel = None
- self._recover_loopingcall = None
- self._lock = threading.RLock()
-
- self._cur_batch_buffer = None
- self._cur_batch_timeout_id = None
-
- self._started = False
- self._closing_connection_by_poller = False
-
- self._queues_to_consume = None
-
- def _on_connection_close(self, connection, reply_code, reply_text):
- self._deliver_cur_batch()
- if self._closing_connection_by_poller:
- return
- with self._lock:
- self._connection = None
- self._start_recover_consuming_task()
-
- def _on_channel_close(self, channel, reply_code, reply_text):
- if self._cur_batch_buffer:
- self._cur_batch_buffer = [
- message for message in self._cur_batch_buffer
- if not message.need_ack()
- ]
- if self._closing_connection_by_poller:
- return
- with self._lock:
- self._channel = None
- self._start_recover_consuming_task()
-
- def _on_consumer_cancel(self, method_frame):
- with self._lock:
- if self._queues_to_consume:
- consumer_tag = method_frame.method.consumer_tag
- for queue_info in self._queues_to_consume:
- if queue_info["consumer_tag"] == consumer_tag:
- queue_info["consumer_tag"] = None
-
- self._start_recover_consuming_task()
-
- def _on_message_no_ack_callback(self, unused, method, properties, body):
- """Is called by Pika when message was received from queue listened with
- no_ack=True mode
- """
- incoming_message = self._incoming_message_class(
- self._pika_engine, None, method, properties, body
- )
- self._on_incoming_message(incoming_message)
-
- def _on_message_with_ack_callback(self, unused, method, properties, body):
- """Is called by Pika when message was received from queue listened with
- no_ack=False mode
- """
- incoming_message = self._incoming_message_class(
- self._pika_engine, self._channel, method, properties, body
- )
- self._on_incoming_message(incoming_message)
-
- def _deliver_cur_batch(self):
- if self._cur_batch_timeout_id is not None:
- self._connection.remove_timeout(self._cur_batch_timeout_id)
- self._cur_batch_timeout_id = None
- if self._cur_batch_buffer:
- buf_to_send = self._cur_batch_buffer
- self._cur_batch_buffer = None
- try:
- self.on_incoming_callback(buf_to_send)
- except Exception:
- LOG.exception("Unexpected exception during incoming delivery")
-
- def _on_incoming_message(self, incoming_message):
- if self._cur_batch_buffer is None:
- self._cur_batch_buffer = [incoming_message]
- else:
- self._cur_batch_buffer.append(incoming_message)
-
- if len(self._cur_batch_buffer) >= self.batch_size:
- self._deliver_cur_batch()
- return
-
- if self._cur_batch_timeout_id is None:
- self._cur_batch_timeout_id = self._connection.add_timeout(
- self.batch_timeout, self._deliver_cur_batch)
-
- def _start_recover_consuming_task(self):
- """Start async job for checking connection to the broker."""
- if self._recover_loopingcall is None and self._started:
- self._recover_loopingcall = (
- loopingcall.DynamicLoopingCall(
- self._try_recover_consuming
- )
- )
- LOG.info("Starting recover consuming job for listener: %s", self)
- self._recover_loopingcall.start()
-
- def _try_recover_consuming(self):
- with self._lock:
- try:
- if self._started:
- self._start_or_recover_consuming()
- except pika_drv_exc.EstablishConnectionException as e:
- LOG.warning(
- "Problem during establishing connection for pika "
- "poller %s", e, exc_info=True
- )
- return self._pika_engine.host_connection_reconnect_delay
- except pika_drv_exc.ConnectionException as e:
- LOG.warning(
- "Connectivity exception during starting/recovering pika "
- "poller %s", e, exc_info=True
- )
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
- LOG.warning(
- "Connectivity exception during starting/recovering pika "
- "poller %s", e, exc_info=True
- )
- except BaseException:
- # NOTE (dukhlov): I preffer to use here BaseException because
- # if this method raise such exception LoopingCall stops
- # execution Probably it should never happen and Exception
- # should be enough but in case of programmer mistake it could
- # be and it is potentially hard to catch problem if we will
- # stop background task. It is better when it continue to work
- # and write a lot of LOG with this error
- LOG.exception("Unexpected exception during "
- "starting/recovering pika poller")
- else:
- self._recover_loopingcall = None
- LOG.info("Recover consuming job was finished for listener: %s",
- self)
- raise loopingcall.LoopingCallDone(True)
- return 0
-
- def _start_or_recover_consuming(self):
- """Performs reconnection to the broker. It is unsafe method for
- internal use only
- """
- if self._connection is None or not self._connection.is_open:
- self._connection = self._pika_engine.create_connection(
- for_listening=True
- )
- self._connection.add_on_close_callback(self._on_connection_close)
- self._channel = None
-
- if self._channel is None or not self._channel.is_open:
- if self._queues_to_consume:
- for queue_info in self._queues_to_consume:
- queue_info["consumer_tag"] = None
-
- self._channel = self._connection.channel()
- self._channel.add_on_close_callback(self._on_channel_close)
- self._channel.add_on_cancel_callback(self._on_consumer_cancel)
- self._channel.basic_qos(prefetch_count=self.prefetch_size)
-
- if self._queues_to_consume is None:
- self._queues_to_consume = self._declare_queue_binding()
-
- self._start_consuming()
-
- def _declare_queue_binding(self):
- """Is called by recovering connection logic if target RabbitMQ
- exchange and (or) queue do not exist. Should be overridden in child
- classes
-
- :return Dictionary: declared_queue_name -> no_ack_mode
- """
- raise NotImplementedError(
- "It is base class. Please declare exchanges and queues here"
- )
-
- def _start_consuming(self):
- """Is called by recovering connection logic for starting consumption
- of configured RabbitMQ queues
- """
-
- assert self._queues_to_consume is not None
-
- try:
- for queue_info in self._queues_to_consume:
- if queue_info["consumer_tag"] is not None:
- continue
- no_ack = queue_info["no_ack"]
-
- on_message_callback = (
- self._on_message_no_ack_callback if no_ack
- else self._on_message_with_ack_callback
- )
-
- queue_info["consumer_tag"] = self._channel.basic_consume(
- on_message_callback, queue_info["queue_name"],
- no_ack=no_ack
- )
- except Exception:
- self._queues_to_consume = None
- raise
-
- def _stop_consuming(self):
- """Is called by poller's stop logic for stopping consumption
- of configured RabbitMQ queues
- """
-
- assert self._queues_to_consume is not None
-
- for queue_info in self._queues_to_consume:
- consumer_tag = queue_info["consumer_tag"]
- if consumer_tag is not None:
- self._channel.basic_cancel(consumer_tag)
- queue_info["consumer_tag"] = None
-
- def start(self, on_incoming_callback):
- """Starts poller. Should be called before polling to allow message
- consuming
-
- :param on_incoming_callback: callback function to be executed when
- listener received messages. Messages should be processed and
- acked/nacked by callback
- """
- super(PikaPoller, self).start(on_incoming_callback)
-
- with self._lock:
- if self._started:
- return
- connected = False
- try:
- self._start_or_recover_consuming()
- except pika_drv_exc.EstablishConnectionException as exc:
- LOG.warning(
- "Can not establish connection during pika poller's "
- "start(). %s", exc, exc_info=True
- )
- except pika_drv_exc.ConnectionException as exc:
- LOG.warning(
- "Connectivity problem during pika poller's start(). %s",
- exc, exc_info=True
- )
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
- LOG.warning(
- "Connectivity problem during pika poller's start(). %s",
- exc, exc_info=True
- )
- else:
- connected = True
-
- self._started = True
- if not connected:
- self._start_recover_consuming_task()
-
- def stop(self):
- """Stops poller. Should be called when polling is not needed anymore to
- stop new message consuming. After that it is necessary to poll already
- prefetched messages
- """
- super(PikaPoller, self).stop()
-
- with self._lock:
- if not self._started:
- return
-
- if self._recover_loopingcall is not None:
- self._recover_loopingcall.stop()
- self._recover_loopingcall = None
-
- if (self._queues_to_consume and self._channel and
- self._channel.is_open):
- try:
- self._stop_consuming()
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
- LOG.warning(
- "Connectivity problem detected during consumer "
- "cancellation. %s", exc, exc_info=True
- )
- self._deliver_cur_batch()
- self._started = False
-
- def cleanup(self):
- """Cleanup allocated resources (channel, connection, etc)."""
- with self._lock:
- if self._connection and self._connection.is_open:
- try:
- self._closing_connection_by_poller = True
- self._connection.close()
- self._closing_connection_by_poller = False
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS:
- # expected errors
- pass
- except Exception:
- LOG.exception("Unexpected error during closing connection")
- finally:
- self._channel = None
- self._connection = None
-
-
-class RpcServicePikaPoller(PikaPoller):
- """PikaPoller implementation for polling RPC messages. Overrides base
- functionality according to RPC specific
- """
- def __init__(self, pika_engine, target, batch_size, batch_timeout,
- prefetch_count):
- """Adds target parameter for declaring RPC specific exchanges and
- queues
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param target: Target, oslo.messaging Target object which defines RPC
- endpoint
- :param batch_size: desired number of messages passed to
- single on_incoming_callback call
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_count: Integer, maximum count of unacknowledged
- messages which RabbitMQ broker sends to this consumer
- """
- self._target = target
-
- super(RpcServicePikaPoller, self).__init__(
- pika_engine, batch_size, batch_timeout, prefetch_count,
- pika_drv_msg.RpcPikaIncomingMessage
- )
-
- def _declare_queue_binding(self):
- """Overrides base method and perform declaration of RabbitMQ exchanges
- and queues which correspond to oslo.messaging RPC target
-
- :return Dictionary: declared_queue_name -> no_ack_mode
- """
- queue_expiration = self._pika_engine.rpc_queue_expiration
-
- exchange = self._pika_engine.get_rpc_exchange_name(
- self._target.exchange
- )
-
- queues_to_consume = []
-
- for no_ack in [True, False]:
- queue = self._pika_engine.get_rpc_queue_name(
- self._target.topic, None, no_ack
- )
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel, exchange=exchange, queue=queue,
- routing_key=queue, exchange_type='direct', durable=False,
- queue_expiration=queue_expiration
- )
- queues_to_consume.append(
- {"queue_name": queue, "no_ack": no_ack, "consumer_tag": None}
- )
-
- if self._target.server:
- server_queue = self._pika_engine.get_rpc_queue_name(
- self._target.topic, self._target.server, no_ack
- )
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel, exchange=exchange, durable=False,
- queue=server_queue, routing_key=server_queue,
- exchange_type='direct', queue_expiration=queue_expiration
- )
- queues_to_consume.append(
- {"queue_name": server_queue, "no_ack": no_ack,
- "consumer_tag": None}
- )
-
- worker_queue = self._pika_engine.get_rpc_queue_name(
- self._target.topic, self._target.server, no_ack, True
- )
- all_workers_routing_key = self._pika_engine.get_rpc_queue_name(
- self._target.topic, "all_workers", no_ack
- )
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel, exchange=exchange, durable=False,
- queue=worker_queue, routing_key=all_workers_routing_key,
- exchange_type='direct', queue_expiration=queue_expiration
- )
- queues_to_consume.append(
- {"queue_name": worker_queue, "no_ack": no_ack,
- "consumer_tag": None}
- )
-
- return queues_to_consume
-
-
-class RpcReplyPikaPoller(PikaPoller):
- """PikaPoller implementation for polling RPC reply messages. Overrides
- base functionality according to RPC reply specific
- """
- def __init__(self, pika_engine, exchange, queue, batch_size, batch_timeout,
- prefetch_count):
- """Adds exchange and queue parameter for declaring exchange and queue
- used for RPC reply delivery
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param exchange: String, exchange name used for RPC reply delivery
- :param queue: String, queue name used for RPC reply delivery
- :param batch_size: desired number of messages passed to
- single on_incoming_callback call
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_count: Integer, maximum count of unacknowledged
- messages which RabbitMQ broker sends to this consumer
- """
- self._exchange = exchange
- self._queue = queue
-
- super(RpcReplyPikaPoller, self).__init__(
- pika_engine, batch_size, batch_timeout, prefetch_count,
- pika_drv_msg.RpcReplyPikaIncomingMessage
- )
-
- def _declare_queue_binding(self):
- """Overrides base method and perform declaration of RabbitMQ exchange
- and queue used for RPC reply delivery
-
- :return Dictionary: declared_queue_name -> no_ack_mode
- """
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel,
- exchange=self._exchange, queue=self._queue,
- routing_key=self._queue, exchange_type='direct',
- queue_expiration=self._pika_engine.rpc_queue_expiration,
- durable=False
- )
-
- return [{"queue_name": self._queue, "no_ack": False,
- "consumer_tag": None}]
-
-
-class NotificationPikaPoller(PikaPoller):
- """PikaPoller implementation for polling Notification messages. Overrides
- base functionality according to Notification specific
- """
- def __init__(self, pika_engine, targets_and_priorities,
- batch_size, batch_timeout, prefetch_count, queue_name=None):
- """Adds targets_and_priorities and queue_name parameter
- for declaring exchanges and queues used for notification delivery
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param targets_and_priorities: list of (target, priority), defines
- default queue names for corresponding notification types
- :param batch_size: desired number of messages passed to
- single on_incoming_callback call
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_count: Integer, maximum count of unacknowledged
- messages which RabbitMQ broker sends to this consumer
- :param queue: String, alternative queue name used for this poller
- instead of default queue name
- """
- self._targets_and_priorities = targets_and_priorities
- self._queue_name = queue_name
-
- super(NotificationPikaPoller, self).__init__(
- pika_engine, batch_size, batch_timeout, prefetch_count,
- pika_drv_msg.PikaIncomingMessage
- )
-
- def _declare_queue_binding(self):
- """Overrides base method and perform declaration of RabbitMQ exchanges
- and queues used for notification delivery
-
- :return Dictionary: declared_queue_name -> no_ack_mode
- """
- queues_to_consume = []
- for target, priority in self._targets_and_priorities:
- routing_key = '%s.%s' % (target.topic, priority)
- queue = self._queue_name or routing_key
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel,
- exchange=(
- target.exchange or
- self._pika_engine.default_notification_exchange
- ),
- queue=queue,
- routing_key=routing_key,
- exchange_type='direct',
- queue_expiration=None,
- durable=self._pika_engine.notification_persistence,
- )
- queues_to_consume.append(
- {"queue_name": queue, "no_ack": False, "consumer_tag": None}
- )
-
- return queues_to_consume
diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py
index b766bb3..3181ae4 100644
--- a/oslo_messaging/opts.py
+++ b/oslo_messaging/opts.py
@@ -23,11 +23,9 @@ import itertools
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
from oslo_messaging._drivers import base as drivers_base
-from oslo_messaging._drivers import impl_pika
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers.impl_zmq import zmq_options
from oslo_messaging._drivers.kafka_driver import kafka_options
-from oslo_messaging._drivers.pika_driver import pika_connection_factory
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
from oslo_messaging.notify import notifier
from oslo_messaging.rpc import client
@@ -50,10 +48,7 @@ _opts = [
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(
- itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts,
- pika_connection_factory.pika_opts,
- impl_pika.pika_pool_opts, impl_pika.message_opts,
- impl_pika.notification_opts, impl_pika.rpc_opts))),
+ itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts))),
('oslo_messaging_kafka', kafka_options.KAFKA_OPTS),
]
diff --git a/oslo_messaging/tests/drivers/pika/__init__.py b/oslo_messaging/tests/drivers/pika/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/tests/drivers/pika/__init__.py
+++ /dev/null
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
deleted file mode 100644
index 3722f87..0000000
--- a/oslo_messaging/tests/drivers/pika/test_message.py
+++ /dev/null
@@ -1,615 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import functools
-import unittest
-
-from concurrent import futures
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-import pika
-from six.moves import mock
-
-import oslo_messaging
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
-
-
-class PikaIncomingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._channel = mock.Mock()
-
- self._delivery_tag = 12345
-
- self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
- self._properties = pika.BasicProperties(
- content_type="application/json",
- headers={"version": "1.0"},
- )
- self._body = (
- b'{"_$_key_context":"context_value",'
- b'"payload_key": "payload_value"}'
- )
-
- def test_message_body_parsing(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- def test_message_acknowledge(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- message.acknowledge()
-
- self.assertEqual(1, self._channel.basic_ack.call_count)
- self.assertEqual({"delivery_tag": self._delivery_tag},
- self._channel.basic_ack.call_args[1])
-
- def test_message_acknowledge_no_ack(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, None, self._method, self._properties,
- self._body
- )
-
- message.acknowledge()
-
- self.assertEqual(0, self._channel.basic_ack.call_count)
-
- def test_message_requeue(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- message.requeue()
-
- self.assertEqual(1, self._channel.basic_nack.call_count)
- self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True},
- self._channel.basic_nack.call_args[1])
-
- def test_message_requeue_no_ack(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, None, self._method, self._properties,
- self._body
- )
-
- message.requeue()
-
- self.assertEqual(0, self._channel.basic_nack.call_count)
-
-
-class RpcPikaIncomingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._pika_engine.rpc_reply_retry_attempts = 3
- self._pika_engine.rpc_reply_retry_delay = 0.25
-
- self._channel = mock.Mock()
-
- self._delivery_tag = 12345
-
- self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
- self._body = (
- b'{"_$_key_context":"context_value",'
- b'"payload_key":"payload_value"}'
- )
- self._properties = pika.BasicProperties(
- content_type="application/json",
- headers={"version": "1.0"},
- )
-
- def test_call_message_body_parsing(self):
- self._properties.correlation_id = 123456789
- self._properties.reply_to = "reply_queue"
-
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertEqual(123456789, message.msg_id)
- self.assertEqual("reply_queue", message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- def test_cast_message_body_parsing(self):
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertIsNone(message.msg_id)
- self.assertIsNone(message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- @mock.patch(("oslo_messaging._drivers.pika_driver.pika_message."
- "PikaOutgoingMessage.send"))
- def test_reply_for_cast_message(self, send_reply_mock):
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertIsNone(message.msg_id)
- self.assertIsNone(message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- message.reply(reply=object())
-
- self.assertEqual(0, send_reply_mock.call_count)
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
- "RpcReplyPikaOutgoingMessage")
- @mock.patch("tenacity.retry")
- def test_positive_reply_for_call_message(self,
- retry_mock,
- outgoing_message_mock):
- self._properties.correlation_id = 123456789
- self._properties.reply_to = "reply_queue"
-
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertEqual(123456789, message.msg_id)
- self.assertEqual("reply_queue", message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
- reply = "all_fine"
- message.reply(reply=reply)
-
- outgoing_message_mock.assert_called_once_with(
- self._pika_engine, 123456789, failure_info=None, reply='all_fine',
- content_type='application/json'
- )
- outgoing_message_mock().send.assert_called_once_with(
- reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
- )
- retry_mock.assert_called_once_with(
- stop=mock.ANY, retry=mock.ANY, wait=mock.ANY
- )
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
- "RpcReplyPikaOutgoingMessage")
- @mock.patch("tenacity.retry")
- def test_negative_reply_for_call_message(self,
- retry_mock,
- outgoing_message_mock):
- self._properties.correlation_id = 123456789
- self._properties.reply_to = "reply_queue"
-
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertEqual(123456789, message.msg_id)
- self.assertEqual("reply_queue", message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- failure_info = object()
- message.reply(failure=failure_info)
-
- outgoing_message_mock.assert_called_once_with(
- self._pika_engine, 123456789,
- failure_info=failure_info,
- reply=None,
- content_type='application/json'
- )
- outgoing_message_mock().send.assert_called_once_with(
- reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
- )
- retry_mock.assert_called_once_with(
- stop=mock.ANY, retry=mock.ANY, wait=mock.ANY
- )
-
-
-class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._pika_engine.allowed_remote_exmods = [
- pika_drv_cmns.EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
- ]
-
- self._channel = mock.Mock()
-
- self._delivery_tag = 12345
-
- self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
-
- self._properties = pika.BasicProperties(
- content_type="application/json",
- headers={"version": "1.0"},
- correlation_id=123456789
- )
-
- def test_positive_reply_message_body_parsing(self):
-
- body = b'{"s": "all fine"}'
-
- message = pika_drv_msg.RpcReplyPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- body
- )
-
- self.assertEqual(123456789, message.msg_id)
- self.assertIsNone(message.failure)
- self.assertEqual("all fine", message.result)
-
- def test_negative_reply_message_body_parsing(self):
-
- body = (b'{'
- b' "e": {'
- b' "s": "Error message",'
- b' "t": ["TRACE HERE"],'
- b' "c": "MessagingException",'
- b' "m": "oslo_messaging.exceptions"'
- b' }'
- b'}')
-
- message = pika_drv_msg.RpcReplyPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- body
- )
-
- self.assertEqual(123456789, message.msg_id)
- self.assertIsNone(message.result)
- self.assertEqual(
- 'Error message\n'
- 'TRACE HERE',
- str(message.failure)
- )
- self.assertIsInstance(message.failure,
- oslo_messaging.MessagingException)
-
-
-class PikaOutgoingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.MagicMock()
- self._pika_engine.default_content_type = "application/json"
- self._exchange = "it is exchange"
- self._routing_key = "it is routing key"
- self._expiration = 1
- self._stopwatch = (
- timeutils.StopWatch(duration=self._expiration).start()
- )
- self._mandatory = object()
-
- self._message = {"msg_type": 1, "msg_str": "hello"}
- self._context = {"request_id": 555, "token": "it is a token"}
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_send_with_confirmation(self):
- message = pika_drv_msg.PikaOutgoingMessage(
- self._pika_engine, self._message, self._context
- )
-
- message.send(
- exchange=self._exchange,
- routing_key=self._routing_key,
- confirm=True,
- mandatory=self._mandatory,
- persistent=True,
- stopwatch=self._stopwatch,
- retrier=None
- )
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._exchange, mandatory=self._mandatory,
- properties=mock.ANY,
- routing_key=self._routing_key
- )
-
- body = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
-
- self.assertEqual(
- b'{"_$_request_id": 555, "_$_token": "it is a token", '
- b'"msg_str": "hello", "msg_type": 1}',
- body
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(2, props.delivery_mode)
- self.assertTrue(self._expiration * 1000 - float(props.expiration) <
- 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertTrue(props.message_id)
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_send_without_confirmation(self):
- message = pika_drv_msg.PikaOutgoingMessage(
- self._pika_engine, self._message, self._context
- )
-
- message.send(
- exchange=self._exchange,
- routing_key=self._routing_key,
- confirm=False,
- mandatory=self._mandatory,
- persistent=False,
- stopwatch=self._stopwatch,
- retrier=None
- )
-
- self._pika_engine.connection_without_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._exchange, mandatory=self._mandatory,
- properties=mock.ANY,
- routing_key=self._routing_key
- )
-
- body = self._pika_engine.connection_without_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
-
- self.assertEqual(
- b'{"_$_request_id": 555, "_$_token": "it is a token", '
- b'"msg_str": "hello", "msg_type": 1}',
- body
- )
-
- props = self._pika_engine.connection_without_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(self._expiration * 1000 - float(props.expiration)
- < 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertTrue(props.message_id)
-
-
-class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._exchange = "it is exchange"
- self._routing_key = "it is routing key"
-
- self._pika_engine = mock.MagicMock()
- self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
- self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
- self._pika_engine.default_content_type = "application/json"
-
- self._message = {"msg_type": 1, "msg_str": "hello"}
- self._context = {"request_id": 555, "token": "it is a token"}
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_send_cast_message(self):
- message = pika_drv_msg.RpcPikaOutgoingMessage(
- self._pika_engine, self._message, self._context
- )
-
- expiration = 1
- stopwatch = timeutils.StopWatch(duration=expiration).start()
-
- message.send(
- exchange=self._exchange,
- routing_key=self._routing_key,
- reply_listener=None,
- stopwatch=stopwatch,
- retrier=None
- )
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._exchange, mandatory=True,
- properties=mock.ANY,
- routing_key=self._routing_key
- )
-
- body = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
-
- self.assertEqual(
- b'{"_$_request_id": 555, "_$_token": "it is a token", '
- b'"msg_str": "hello", "msg_type": 1}',
- body
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertIsNone(props.correlation_id)
- self.assertIsNone(props.reply_to)
- self.assertTrue(props.message_id)
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_send_call_message(self):
- message = pika_drv_msg.RpcPikaOutgoingMessage(
- self._pika_engine, self._message, self._context
- )
-
- expiration = 1
- stopwatch = timeutils.StopWatch(duration=expiration).start()
-
- result = "it is a result"
- reply_queue_name = "reply_queue_name"
-
- future = futures.Future()
- future.set_result(result)
- reply_listener = mock.Mock()
- reply_listener.register_reply_waiter.return_value = future
- reply_listener.get_reply_qname.return_value = reply_queue_name
-
- res = message.send(
- exchange=self._exchange,
- routing_key=self._routing_key,
- reply_listener=reply_listener,
- stopwatch=stopwatch,
- retrier=None
- )
-
- self.assertEqual(result, res)
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._exchange, mandatory=True,
- properties=mock.ANY,
- routing_key=self._routing_key
- )
-
- body = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
-
- self.assertEqual(
- b'{"_$_request_id": 555, "_$_token": "it is a token", '
- b'"msg_str": "hello", "msg_type": 1}',
- body
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertEqual(message.msg_id, props.correlation_id)
- self.assertEqual(reply_queue_name, props.reply_to)
- self.assertTrue(props.message_id)
-
-
-class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._reply_q = "reply_queue_name"
-
- self._expiration = 1
- self._stopwatch = (
- timeutils.StopWatch(duration=self._expiration).start()
- )
-
- self._pika_engine = mock.MagicMock()
-
- self._rpc_reply_exchange = "rpc_reply_exchange"
- self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
- self._pika_engine.default_content_type = "application/json"
-
- self._msg_id = 12345567
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_success_message_send(self):
- message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
- self._pika_engine, self._msg_id, reply="all_fine"
- )
-
- message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None)
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=b'{"s": "all_fine"}',
- exchange=self._rpc_reply_exchange, mandatory=True,
- properties=mock.ANY,
- routing_key=self._reply_q
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(self._expiration * 1000 - float(props.expiration) <
- 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertEqual(message.msg_id, props.correlation_id)
- self.assertIsNone(props.reply_to)
- self.assertTrue(props.message_id)
-
- @mock.patch("traceback.format_exception", new=lambda x, y, z: z)
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_failure_message_send(self):
- failure_info = (oslo_messaging.MessagingException,
- oslo_messaging.MessagingException("Error message"),
- ['It is a trace'])
-
- message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
- self._pika_engine, self._msg_id, failure_info=failure_info
- )
-
- message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None)
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._rpc_reply_exchange,
- mandatory=True,
- properties=mock.ANY,
- routing_key=self._reply_q
- )
-
- body = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
- self.assertEqual(
- b'{"e": {"c": "MessagingException", '
- b'"m": "oslo_messaging.exceptions", "s": "Error message", '
- b'"t": ["It is a trace"]}}',
- body
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(self._expiration * 1000 - float(props.expiration) <
- 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertEqual(message.msg_id, props.correlation_id)
- self.assertIsNone(props.reply_to)
- self.assertTrue(props.message_id)
diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py
deleted file mode 100644
index 7957bec..0000000
--- a/oslo_messaging/tests/drivers/pika/test_poller.py
+++ /dev/null
@@ -1,482 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-import time
-import unittest
-
-from concurrent import futures
-from six.moves import mock
-
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-from oslo_messaging._drivers.pika_driver import pika_poller
-
-
-class PikaPollerTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._poller_connection_mock = mock.Mock()
- self._poller_channel_mock = mock.Mock()
- self._poller_connection_mock.channel.return_value = (
- self._poller_channel_mock
- )
- self._pika_engine.create_connection.return_value = (
- self._poller_connection_mock
- )
-
- self._executor = futures.ThreadPoolExecutor(1)
-
- def timer_task(timeout, callback):
- time.sleep(timeout)
- callback()
-
- self._poller_connection_mock.add_timeout.side_effect = (
- lambda *args: self._executor.submit(timer_task, *args)
- )
-
- self._prefetch_count = 123
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
- "_declare_queue_binding")
- def test_start(self, declare_queue_binding_mock):
- poller = pika_poller.PikaPoller(
- self._pika_engine, 1, None, self._prefetch_count, None
- )
-
- poller.start(None)
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
- self.assertTrue(declare_queue_binding_mock.called)
-
- def test_start_when_connection_unavailable(self):
- poller = pika_poller.PikaPoller(
- self._pika_engine, 1, None, self._prefetch_count, None
- )
-
- self._pika_engine.create_connection.side_effect = (
- pika_drv_exc.EstablishConnectionException
- )
-
- # start() should not raise socket.timeout exception
- poller.start(None)
-
- # stop is needed to stop reconnection background job
- poller.stop()
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
- "_declare_queue_binding")
- def test_message_processing(self, declare_queue_binding_mock):
- res = []
-
- def on_incoming_callback(incoming):
- res.append(incoming)
-
- incoming_message_class_mock = mock.Mock()
- poller = pika_poller.PikaPoller(
- self._pika_engine, 1, None, self._prefetch_count,
- incoming_message_class=incoming_message_class_mock
- )
- unused = object()
- method = object()
- properties = object()
- body = object()
-
- poller.start(on_incoming_callback)
- poller._on_message_with_ack_callback(
- unused, method, properties, body
- )
-
- self.assertEqual(1, len(res))
-
- self.assertEqual([incoming_message_class_mock.return_value], res[0])
- incoming_message_class_mock.assert_called_once_with(
- self._pika_engine, self._poller_channel_mock, method, properties,
- body
- )
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- self.assertTrue(declare_queue_binding_mock.called)
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
- "_declare_queue_binding")
- def test_message_processing_batch(self, declare_queue_binding_mock):
- incoming_message_class_mock = mock.Mock()
-
- n = 10
- params = []
-
- res = []
-
- def on_incoming_callback(incoming):
- res.append(incoming)
-
- poller = pika_poller.PikaPoller(
- self._pika_engine, n, None, self._prefetch_count,
- incoming_message_class=incoming_message_class_mock
- )
-
- for i in range(n):
- params.append((object(), object(), object(), object()))
-
- poller.start(on_incoming_callback)
-
- for i in range(n):
- poller._on_message_with_ack_callback(
- *params[i]
- )
-
- self.assertEqual(1, len(res))
- self.assertEqual(10, len(res[0]))
- self.assertEqual(n, incoming_message_class_mock.call_count)
-
- for i in range(n):
- self.assertEqual(incoming_message_class_mock.return_value,
- res[0][i])
- self.assertEqual(
- (self._pika_engine, self._poller_channel_mock) + params[i][1:],
- incoming_message_class_mock.call_args_list[i][0]
- )
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- self.assertTrue(declare_queue_binding_mock.called)
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
- "_declare_queue_binding")
- def test_message_processing_batch_with_timeout(self,
- declare_queue_binding_mock):
- incoming_message_class_mock = mock.Mock()
-
- n = 10
- timeout = 1
-
- res = []
- evt = threading.Event()
-
- def on_incoming_callback(incoming):
- res.append(incoming)
- evt.set()
-
- poller = pika_poller.PikaPoller(
- self._pika_engine, n, timeout, self._prefetch_count,
- incoming_message_class=incoming_message_class_mock
- )
-
- params = []
-
- success_count = 5
-
- poller.start(on_incoming_callback)
-
- for i in range(n):
- params.append((object(), object(), object(), object()))
-
- for i in range(success_count):
- poller._on_message_with_ack_callback(
- *params[i]
- )
-
- self.assertTrue(evt.wait(timeout * 2))
-
- self.assertEqual(1, len(res))
- self.assertEqual(success_count, len(res[0]))
- self.assertEqual(success_count, incoming_message_class_mock.call_count)
-
- for i in range(success_count):
- self.assertEqual(incoming_message_class_mock.return_value,
- res[0][i])
- self.assertEqual(
- (self._pika_engine, self._poller_channel_mock) + params[i][1:],
- incoming_message_class_mock.call_args_list[i][0]
- )
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- self.assertTrue(declare_queue_binding_mock.called)
-
-
-class RpcServicePikaPollerTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._poller_connection_mock = mock.Mock()
- self._poller_channel_mock = mock.Mock()
- self._poller_connection_mock.channel.return_value = (
- self._poller_channel_mock
- )
- self._pika_engine.create_connection.return_value = (
- self._poller_connection_mock
- )
-
- self._pika_engine.get_rpc_queue_name.side_effect = (
- lambda topic, server, no_ack, worker=False:
- "_".join([topic, str(server), str(no_ack), str(worker)])
- )
-
- self._pika_engine.get_rpc_exchange_name.side_effect = (
- lambda exchange: exchange
- )
-
- self._prefetch_count = 123
- self._target = mock.Mock(exchange="exchange", topic="topic",
- server="server")
- self._pika_engine.rpc_queue_expiration = 12345
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
- "RpcPikaIncomingMessage")
- def test_declare_rpc_queue_bindings(self, rpc_pika_incoming_message_mock):
- poller = pika_poller.RpcServicePikaPoller(
- self._pika_engine, self._target, 1, None,
- self._prefetch_count
- )
-
- poller.start(None)
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- declare_queue_binding_by_channel_mock = (
- self._pika_engine.declare_queue_binding_by_channel
- )
-
- self.assertEqual(
- 6, declare_queue_binding_by_channel_mock.call_count
- )
-
- declare_queue_binding_by_channel_mock.assert_has_calls((
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_None_True_False",
- queue_expiration=12345,
- routing_key="topic_None_True_False"
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_server_True_False",
- queue_expiration=12345,
- routing_key="topic_server_True_False"
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_server_True_True",
- queue_expiration=12345,
- routing_key="topic_all_workers_True_False"
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_None_False_False",
- queue_expiration=12345,
- routing_key="topic_None_False_False"
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_server_False_False",
- queue_expiration=12345,
- routing_key='topic_server_False_False'
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_server_False_True",
- queue_expiration=12345,
- routing_key='topic_all_workers_False_False'
- )
- ))
-
-
-class RpcReplyServicePikaPollerTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._poller_connection_mock = mock.Mock()
- self._poller_channel_mock = mock.Mock()
- self._poller_connection_mock.channel.return_value = (
- self._poller_channel_mock
- )
- self._pika_engine.create_connection.return_value = (
- self._poller_connection_mock
- )
-
- self._prefetch_count = 123
- self._exchange = "rpc_reply_exchange"
- self._queue = "rpc_reply_queue"
-
- self._pika_engine.rpc_reply_retry_delay = 12132543456
-
- self._pika_engine.rpc_queue_expiration = 12345
- self._pika_engine.rpc_reply_retry_attempts = 3
-
- def test_declare_rpc_reply_queue_binding(self):
- poller = pika_poller.RpcReplyPikaPoller(
- self._pika_engine, self._exchange, self._queue, 1, None,
- self._prefetch_count,
- )
-
- poller.start(None)
- poller.stop()
-
- declare_queue_binding_by_channel_mock = (
- self._pika_engine.declare_queue_binding_by_channel
- )
-
- self.assertEqual(
- 1, declare_queue_binding_by_channel_mock.call_count
- )
-
- declare_queue_binding_by_channel_mock.assert_called_once_with(
- channel=self._poller_channel_mock, durable=False,
- exchange='rpc_reply_exchange', exchange_type='direct',
- queue='rpc_reply_queue', queue_expiration=12345,
- routing_key='rpc_reply_queue'
- )
-
-
-class NotificationPikaPollerTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._poller_connection_mock = mock.Mock()
- self._poller_channel_mock = mock.Mock()
- self._poller_connection_mock.channel.return_value = (
- self._poller_channel_mock
- )
- self._pika_engine.create_connection.return_value = (
- self._poller_connection_mock
- )
-
- self._prefetch_count = 123
- self._target_and_priorities = (
- (
- mock.Mock(exchange="exchange1", topic="topic1",
- server="server1"), 1
- ),
- (
- mock.Mock(exchange="exchange1", topic="topic1"), 2
- ),
- (
- mock.Mock(exchange="exchange2", topic="topic2",), 1
- ),
- )
- self._pika_engine.notification_persistence = object()
-
- def test_declare_notification_queue_bindings_default_queue(self):
- poller = pika_poller.NotificationPikaPoller(
- self._pika_engine, self._target_and_priorities, 1, None,
- self._prefetch_count, None
- )
-
- poller.start(None)
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- declare_queue_binding_by_channel_mock = (
- self._pika_engine.declare_queue_binding_by_channel
- )
-
- self.assertEqual(
- 3, declare_queue_binding_by_channel_mock.call_count
- )
-
- declare_queue_binding_by_channel_mock.assert_has_calls((
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange1",
- exchange_type='direct',
- queue="topic1.1",
- queue_expiration=None,
- routing_key="topic1.1"
- ),
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange1",
- exchange_type='direct',
- queue="topic1.2",
- queue_expiration=None,
- routing_key="topic1.2"
- ),
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange2",
- exchange_type='direct',
- queue="topic2.1",
- queue_expiration=None,
- routing_key="topic2.1"
- )
- ))
-
- def test_declare_notification_queue_bindings_custom_queue(self):
- poller = pika_poller.NotificationPikaPoller(
- self._pika_engine, self._target_and_priorities, 1, None,
- self._prefetch_count, "custom_queue_name"
- )
-
- poller.start(None)
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- declare_queue_binding_by_channel_mock = (
- self._pika_engine.declare_queue_binding_by_channel
- )
-
- self.assertEqual(
- 3, declare_queue_binding_by_channel_mock.call_count
- )
-
- declare_queue_binding_by_channel_mock.assert_has_calls((
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange1",
- exchange_type='direct',
- queue="custom_queue_name",
- queue_expiration=None,
- routing_key="topic1.1"
- ),
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange1",
- exchange_type='direct',
- queue="custom_queue_name",
- queue_expiration=None,
- routing_key="topic1.2"
- ),
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange2",
- exchange_type='direct',
- queue="custom_queue_name",
- queue_expiration=None,
- routing_key="topic2.1"
- )
- ))
diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py
index ffeaf0a..db06d01 100644
--- a/oslo_messaging/tests/functional/test_rabbitmq.py
+++ b/oslo_messaging/tests/functional/test_rabbitmq.py
@@ -63,10 +63,6 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
self.n2 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME2"]
self.n3 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME3"]
- # NOTE(gdavoian): additional tweak for pika driver
- if self.driver == "pika":
- self.url = self.url.replace("rabbit", "pika")
-
# ensure connections will be establish to the first node
self.pifpaf.stop_node(self.n2)
self.pifpaf.stop_node(self.n3)
@@ -115,19 +111,6 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
return "callback done"
def _check_ports(self, port):
- getattr(self, '_check_ports_%s_driver' % self.driver)(port)
-
- def _check_ports_pika_driver(self, port):
- rpc_server = self.servers.servers[0].server
- # FIXME(sileht): Check other connections
- connections = [
- rpc_server.listener._connection
- ]
- for conn in connections:
- self.assertEqual(
- port, conn._impl.socket.getpeername()[1])
-
- def _check_ports_rabbit_driver(self, port):
rpc_server = self.servers.servers[0].server
connection_contexts = [
# rpc server
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index c381ebd..82bdbd9 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -304,8 +304,6 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
driver = os.environ.get("TRANSPORT_DRIVER")
if driver:
self.url = os.environ.get('PIFPAF_URL')
- if driver == "pika" and self.url:
- self.url = self.url.replace("rabbit://", "pika://")
else:
self.url = os.environ.get('TRANSPORT_URL')
diff --git a/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml b/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml
deleted file mode 100644
index e07f551..0000000
--- a/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml
+++ /dev/null
@@ -1,15 +0,0 @@
-- hosts: primary
- tasks:
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/logs/**
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
diff --git a/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml b/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml
deleted file mode 100644
index 572f1d1..0000000
--- a/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml
+++ /dev/null
@@ -1,42 +0,0 @@
-- hosts: all
- name: Autoconverted job legacy-oslo.messaging-src-dsvm-full-pika-default from old
- job gate-oslo.messaging-src-dsvm-full-pika-default-ubuntu-xenial-nv
- tasks:
-
- - name: Ensure legacy workspace directory
- file:
- path: '{{ ansible_user_dir }}/workspace'
- state: directory
-
- - shell:
- cmd: |
- set -e
- set -x
- cat > clonemap.yaml << EOF
- clonemap:
- - name: openstack-infra/devstack-gate
- dest: devstack-gate
- EOF
- /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
- git://git.openstack.org \
- openstack-infra/devstack-gate
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
-
- - shell:
- cmd: |
- set -e
- set -x
- export PYTHONUNBUFFERED=true
- export DEVSTACK_GATE_TEMPEST=1
- export DEVSTACK_GATE_TEMPEST_FULL=1
- export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
- export DEVSTACK_LOCAL_CONFIG="enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
- export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
-
- cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
- ./safe-devstack-vm-gate-wrap.sh
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
diff --git a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml b/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml
deleted file mode 100644
index dac8753..0000000
--- a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml
+++ /dev/null
@@ -1,80 +0,0 @@
-- hosts: primary
- tasks:
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*nose_results.html
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*testr_results.html.gz
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/.testrepository/tmp*
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*testrepository.subunit.gz
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}/tox'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/.tox/*/log/*
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/logs/**
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
diff --git a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml b/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml
deleted file mode 100644
index 8cad6ef..0000000
--- a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml
+++ /dev/null
@@ -1,79 +0,0 @@
-- hosts: all
- name: Autoconverted job legacy-oslo.messaging-telemetry-dsvm-integration-pika from
- old job gate-oslo.messaging-telemetry-dsvm-integration-pika-ubuntu-xenial-nv
- tasks:
-
- - name: Ensure legacy workspace directory
- file:
- path: '{{ ansible_user_dir }}/workspace'
- state: directory
-
- - shell:
- cmd: |
- set -e
- set -x
- cat > clonemap.yaml << EOF
- clonemap:
- - name: openstack-infra/devstack-gate
- dest: devstack-gate
- EOF
- /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
- git://git.openstack.org \
- openstack-infra/devstack-gate
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
-
- - shell:
- cmd: |
- set -e
- set -x
- export PYTHONUNBUFFERED=true
-
- export DEVSTACK_GATE_HEAT=1
- export DEVSTACK_GATE_NEUTRON=1
- export DEVSTACK_GATE_TEMPEST=1
- export DEVSTACK_GATE_EXERCISES=0
- export DEVSTACK_GATE_INSTALL_TESTONLY=1
-
- export PROJECTS="openstack/ceilometer $PROJECTS"
- export PROJECTS="openstack/aodh $PROJECTS"
- export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
-
- case "$ZUUL_BRANCH" in
- "stable/ocata")
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin gnocchi git://git.openstack.org/openstack/gnocchi"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://git.openstack.org/openstack/panko"
- export OVERRIDE_GNOCCHI_PROJECT_BRANCH="stable/3.1"
- export PROJECTS="openstack/panko $PROJECTS openstack/gnocchi"
- ;;
- *)
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://git.openstack.org/openstack/panko"
- export PROJECTS="openstack/panko $PROJECTS"
- ;;
- esac
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin ceilometer git://git.openstack.org/openstack/ceilometer"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin aodh git://git.openstack.org/openstack/aodh"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin heat git://git.openstack.org/openstack/heat"
-
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"CEILOMETER_BACKEND=gnocchi"
-
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"GNOCCHI_ARCHIVE_POLICY=high"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"CEILOMETER_PIPELINE_INTERVAL=5"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"GNOCCHI_STORAGE_BACKEND=file"
-
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
-
- export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
-
- function post_test_hook {
- cd /opt/stack/new/ceilometer/ceilometer/tests/integration/hooks/
- ./post_test_hook.sh
- }
- export -f post_test_hook
-
- cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
- ./safe-devstack-vm-gate-wrap.sh
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
diff --git a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml b/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml
deleted file mode 100644
index e07f551..0000000
--- a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml
+++ /dev/null
@@ -1,15 +0,0 @@
-- hosts: primary
- tasks:
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/logs/**
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
diff --git a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml b/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml
deleted file mode 100644
index 5dd6533..0000000
--- a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml
+++ /dev/null
@@ -1,45 +0,0 @@
-- hosts: all
- name: Autoconverted job legacy-tempest-neutron-dsvm-src-oslo.messaging-pika-default
- from old job gate-tempest-neutron-dsvm-src-oslo.messaging-pika-default-ubuntu-xenial-nv
- tasks:
-
- - name: Ensure legacy workspace directory
- file:
- path: '{{ ansible_user_dir }}/workspace'
- state: directory
-
- - shell:
- cmd: |
- set -e
- set -x
- cat > clonemap.yaml << EOF
- clonemap:
- - name: openstack-infra/devstack-gate
- dest: devstack-gate
- EOF
- /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
- git://git.openstack.org \
- openstack-infra/devstack-gate
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
-
- - shell:
- cmd: |
- set -e
- set -x
- export PYTHONUNBUFFERED=true
- export DEVSTACK_GATE_TEMPEST=1
- export DEVSTACK_GATE_TEMPEST_FULL=1
- export DEVSTACK_GATE_NEUTRON=1
-
- export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
- export DEVSTACK_LOCAL_CONFIG="enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
-
- export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
-
- cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
- ./safe-devstack-vm-gate-wrap.sh
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
diff --git a/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml b/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml
new file mode 100644
index 0000000..0d4e123
--- /dev/null
+++ b/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml
@@ -0,0 +1,8 @@
+---
+prelude: >
+ The Pika-based driver for RabbitMQ has been removed.
+upgrade:
+ - |
+ Users of the Pika-based driver must change the prefix of all the
+ transport_url configuration options from "pika://..." to
+ "rabbit://..." to use the default kombu based RabbitMQ driver.
diff --git a/requirements.txt b/requirements.txt
index c6a0912..2055389 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -28,10 +28,8 @@ PyYAML>=3.12 # MIT
# we set the amqp version to ensure heartbeat works
amqp!=2.1.4,>=2.1.1 # BSD
kombu!=4.0.2,>=4.0.0 # BSD
-pika>=0.10.0 # BSD
-pika-pool>=0.1.3 # BSD
-# used by pika and zmq drivers
+# used by zmq driver
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD
tenacity>=3.2.1 # Apache-2.0
diff --git a/setup.cfg b/setup.cfg
index f5bda49..795191d 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -41,7 +41,6 @@ oslo.messaging.drivers =
# This is just for internal testing
fake = oslo_messaging._drivers.impl_fake:FakeDriver
- pika = oslo_messaging._drivers.impl_pika:PikaDriver
oslo.messaging.executors =
blocking = futurist:SynchronousExecutor
@@ -56,21 +55,6 @@ oslo.messaging.notify.drivers =
noop = oslo_messaging.notify._impl_noop:NoOpDriver
routing = oslo_messaging.notify._impl_routing:RoutingDriver
-oslo.messaging.pika.connection_factory =
- # Creates new connection for each create_connection call. Old-style behaviour
- # Uses a much more connections then single and read_write factories but still available as
- # an option
- new = oslo_messaging._drivers.pika_driver.pika_connection_factory:PikaConnectionFactory
-
- # Creates only one connection for transport and return it for each create connection call
- # it is default, but you can not use it with synchronous executor
- single = oslo_messaging._drivers.pika_driver.pika_connection_factory:SinglePikaConnectionFactory
-
- # Create two connections - one for listening and another one for sending and return them
- # for each create connection call depending on connection purpose. Creates one more connection
- # but you can use it with synchronous executor
- read_write = oslo_messaging._drivers.pika_driver.pika_connection_factory:ReadWritePikaConnectionFactory
-
oslo.messaging.zmq.matchmaker =
# Matchmakers for ZeroMQ
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy
diff --git a/tox.ini b/tox.ini
index 70666a2..66882de 100644
--- a/tox.ini
+++ b/tox.ini
@@ -44,12 +44,6 @@ setenv =
basepython = python3.5
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
-[testenv:py27-func-pika]
-setenv =
- {[testenv]setenv}
- TRANSPORT_DRIVER=pika
-commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
-
[testenv:py27-func-kafka]
setenv =
{[testenv]setenv}