summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Giusti <kgiusti@gmail.com>2018-01-23 15:23:15 -0500
committerKenneth Giusti <kgiusti@gmail.com>2018-03-21 10:58:23 -0400
commit222a939361579f73fb33a51580fd2ed4d28decb3 (patch)
tree86565cf1f33acb57a1ba1bddac3c642305c1b4c4
parentdec178257b49c1992eeb66306a62b65d603e06ef (diff)
downloadoslo-messaging-6.0.0.tar.gz
Remove the deprecated Pika driver6.0.0
It is recommended that all users of the Pika driver transition to using the Rabbit driver instead. Typically this is done by changing the prefix of the transport_url configuration option from "pika://..." to "rabbit://...". There are no changes required to the RabbitMQ server configuration. Change-Id: I52ea5ccb7e7c247abd95e2d8d50dac4c4ad11246 Closes-Bug: #1744741
-rw-r--r--.zuul.yaml57
-rw-r--r--bindep.txt8
-rw-r--r--doc/source/admin/index.rst1
-rw-r--r--doc/source/admin/pika_driver.rst160
-rw-r--r--oslo_messaging/_drivers/impl_pika.py366
-rw-r--r--oslo_messaging/_drivers/pika_driver/__init__.py0
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_commons.py40
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_connection.py542
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_connection_factory.py307
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_engine.py303
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_exceptions.py68
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_listener.py123
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_message.py618
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_poller.py538
-rw-r--r--oslo_messaging/opts.py7
-rw-r--r--oslo_messaging/tests/drivers/pika/__init__.py0
-rw-r--r--oslo_messaging/tests/drivers/pika/test_message.py615
-rw-r--r--oslo_messaging/tests/drivers/pika/test_poller.py482
-rw-r--r--oslo_messaging/tests/functional/test_rabbitmq.py17
-rw-r--r--oslo_messaging/tests/functional/utils.py2
-rw-r--r--playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml15
-rw-r--r--playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml42
-rw-r--r--playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml80
-rw-r--r--playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml79
-rw-r--r--playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml15
-rw-r--r--playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml45
-rw-r--r--releasenotes/notes/remove-pika-1bae204ced2521a3.yaml8
-rw-r--r--requirements.txt4
-rw-r--r--setup.cfg16
-rw-r--r--tox.ini6
30 files changed, 13 insertions, 4551 deletions
diff --git a/.zuul.yaml b/.zuul.yaml
index 110cc89..11fb98b 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -12,13 +12,6 @@
bindep_profile: kafka
- job:
- name: oslo.messaging-tox-py27-func-pika
- parent: openstack-tox-py27
- vars:
- tox_envlist: py27-func-pika
- bindep_profile: pika
-
-- job:
name: oslo.messaging-tox-py27-func-rabbit
parent: openstack-tox-py27
vars:
@@ -77,17 +70,6 @@
- openstack/oslo.messaging
- job:
- name: oslo.messaging-src-dsvm-full-pika-default
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml
- post-run: playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml
- timeout: 10800
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/devstack-plugin-pika
- - openstack/oslo.messaging
-
-- job:
name: oslo.messaging-src-dsvm-full-amqp1-dual-centos-7
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-src-dsvm-full-amqp1-dual-centos-7/run.yaml
@@ -212,24 +194,6 @@
- openstack/diskimage-builder
- job:
- name: oslo.messaging-telemetry-dsvm-integration-pika
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml
- post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml
- timeout: 4200
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/aodh
- - openstack/ceilometer
- - openstack/devstack-plugin-pika
- - openstack/oslo.messaging
- - openstack/panko
- # following are required when DEVSTACK_GATE_HEAT, which this
- # job turns on
- - openstack/dib-utils
- - openstack/diskimage-builder
-
-- job:
name: oslo.messaging-telemetry-dsvm-integration-zmq
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml
@@ -305,19 +269,6 @@
- openstack/tempest
- job:
- name: oslo.messaging-tempest-neutron-dsvm-src-pika-default
- parent: legacy-dsvm-base
- run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml
- post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml
- timeout: 7800
- required-projects:
- - openstack-infra/devstack-gate
- - openstack/devstack-plugin-pika
- - openstack/neutron
- - openstack/oslo.messaging
- - openstack/tempest
-
-- job:
name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml
@@ -338,7 +289,6 @@
voting: false
- oslo.messaging-tox-py27-func-kafka:
voting: false
- - oslo.messaging-tox-py27-func-pika
- oslo.messaging-tox-py27-func-rabbit
- oslo.messaging-tox-py27-func-zmq-proxy:
voting: false
@@ -364,8 +314,6 @@
voting: false
- oslo.messaging-src-dsvm-full-kafka-default:
voting: false
- - oslo.messaging-src-dsvm-full-pika-default:
- voting: false
- oslo.messaging-src-dsvm-full-zmq-default:
voting: false
@@ -379,8 +327,6 @@
voting: false
- oslo.messaging-telemetry-dsvm-integration-kafka:
voting: false
- - oslo.messaging-telemetry-dsvm-integration-pika:
- voting: false
- oslo.messaging-telemetry-dsvm-integration-zmq:
voting: false
@@ -390,15 +336,12 @@
branches: ^(?!stable/ocata).*$
- oslo.messaging-tempest-neutron-dsvm-src-kafka-default:
voting: false
- - oslo.messaging-tempest-neutron-dsvm-src-pika-default:
- voting: false
- oslo.messaging-tempest-neutron-dsvm-src-zmq-default:
voting: false
gate:
jobs:
- oslo.messaging-tox-py27-func-rabbit
- - oslo.messaging-tox-py27-func-pika
- oslo.messaging-telemetry-dsvm-integration-rabbit
- oslo.messaging-src-dsvm-full-rabbit-default
- oslo.messaging-tempest-neutron-dsvm-src-rabbit-default
diff --git a/bindep.txt b/bindep.txt
index 96788ae..d9728bb 100644
--- a/bindep.txt
+++ b/bindep.txt
@@ -10,11 +10,9 @@ make [platform:rpm]
pkgconfig [platform:rpm]
libffi-devel [platform:rpm]
-# kombu/pika dpkg
-rabbitmq-server [platform:dpkg rabbit pika]
-
-# kombu/pika rpm
-rabbitmq-server [platform:rpm rabbit pika]
+# RabbitMQ message broker
+rabbitmq-server [platform:dpkg rabbit]
+rabbitmq-server [platform:rpm rabbit]
# zmq
redis [platform:rpm zmq]
diff --git a/doc/source/admin/index.rst b/doc/source/admin/index.rst
index d032aaf..63104bb 100644
--- a/doc/source/admin/index.rst
+++ b/doc/source/admin/index.rst
@@ -7,5 +7,4 @@ Deployment Guide
drivers
AMQP1.0
- pika_driver
zmq_driver
diff --git a/doc/source/admin/pika_driver.rst b/doc/source/admin/pika_driver.rst
deleted file mode 100644
index b89fabd..0000000
--- a/doc/source/admin/pika_driver.rst
+++ /dev/null
@@ -1,160 +0,0 @@
-------------------------------
-Pika Driver Deployment Guide
-------------------------------
-
-.. currentmodule:: oslo_messaging
-
-.. warning:: the Pika driver is no longer maintained and will be
- removed from Oslo.Messaging at a future date. It is recommended that
- all users of the Pika driver transition to using the Rabbit driver.
-
-============
-Introduction
-============
-
-Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including
-RabbitMQ's extensions. It is very actively supported and recommended by
-RabbitMQ developers
-
-========
-Abstract
-========
-
-PikaDriver is one of oslo.messaging backend drivers. It supports RPC and Notify
-patterns. Currently it could be the only oslo.messaging driver across the
-OpenStack cluster. This document provides deployment information for this
-driver in oslo_messaging.
-
-This driver is able to work with single instance of RabbitMQ server or
-RabbitMQ cluster.
-
-
-=============
-Configuration
-=============
-
-Enabling (mandatory)
---------------------
-
-To enable the driver, in the section [DEFAULT] of the conf file,
-the 'transport_url' parameter should be set to
-`pika://user:pass@host1:port[,hostN:portN]`
-
- [DEFAULT]
- transport_url = pika://guest:guest@localhost:5672
-
-
-Connection options (optional)
------------------------------
-
-In section [oslo_messaging_pika]:
-#. channel_max - Maximum number of channels to allow,
-
-#. frame_max (default - pika default value): The maximum byte size for
- an AMQP frame,
-
-#. heartbeat_interval (default=1): How often to send heartbeats for
- consumer's connections in seconds. If 0 - disable heartbeats,
-
-#. ssl (default=False): Enable SSL if True,
-
-#. ssl_options (default=None): Arguments passed to ssl.wrap_socket,
-
-#. socket_timeout (default=0.25): Set timeout for opening new connection's
- socket,
-
-#. tcp_user_timeout (default=0.25): Set TCP_USER_TIMEOUT in seconds for
- connection's socket,
-
-#. host_connection_reconnect_delay (default=0.25): Set delay for reconnection
- to some host after connection error
-
-
-Connection pool options (optional)
-----------------------------------
-
-In section [oslo_messaging_pika]:
-
-#. pool_max_size (default=10): Maximum number of connections to keep queued,
-
-#. pool_max_overflow (default=0): Maximum number of connections to create above
- `pool_max_size`,
-
-#. pool_timeout (default=30): Default number of seconds to wait for a
- connections to available,
-
-#. pool_recycle (default=600): Lifetime of a connection (since creation) in
- seconds or None for no recycling. Expired connections are closed on acquire,
-
-#. pool_stale (default=60): Threshold at which inactive (since release)
- connections are considered stale in seconds or None for no staleness.
- Stale connections are closed on acquire.")
-
-RPC related options (optional)
-------------------------------
-
-In section [oslo_messaging_pika]:
-
-#. rpc_queue_expiration (default=60): Time to live for rpc queues without
- consumers in seconds,
-
-#. default_rpc_exchange (default="${control_exchange}_rpc"): Exchange name for
- sending RPC messages,
-
-#. rpc_reply_exchange', default=("${control_exchange}_rpc_reply"): Exchange
- name for receiving RPC replies,
-
-#. rpc_listener_prefetch_count (default=100): Max number of not acknowledged
- message which RabbitMQ can send to rpc listener,
-
-#. rpc_reply_listener_prefetch_count (default=100): Max number of not
- acknowledged message which RabbitMQ can send to rpc reply listener,
-
-#. rpc_reply_retry_attempts (default=-1): Reconnecting retry count in case of
- connectivity problem during sending reply. -1 means infinite retry during
- rpc_timeout,
-
-#. rpc_reply_retry_delay (default=0.25) Reconnecting retry delay in case of
- connectivity problem during sending reply,
-
-#. default_rpc_retry_attempts (default=-1): Reconnecting retry count in case of
- connectivity problem during sending RPC message, -1 means infinite retry. If
- actual retry attempts in not 0 the rpc request could be processed more than
- one time,
-
-#. rpc_retry_delay (default=0.25): Reconnecting retry delay in case of
- connectivity problem during sending RPC message
-
-$control_exchange in this code is value of [DEFAULT].control_exchange option,
-which is "openstack" by default
-
-Notification related options (optional)
----------------------------------------
-
-In section [oslo_messaging_pika]:
-
-#. notification_persistence (default=False): Persist notification messages,
-
-#. default_notification_exchange (default="${control_exchange}_notification"):
- Exchange name for sending notifications,
-
-#. notification_listener_prefetch_count (default=100): Max number of not
- acknowledged message which RabbitMQ can send to notification listener,
-
-#. default_notification_retry_attempts (default=-1): Reconnecting retry count
- in case of connectivity problem during sending notification, -1 means
- infinite retry,
-
-#. notification_retry_delay (default=0.25): Reconnecting retry delay in case of
- connectivity problem during sending notification message
-
-$control_exchange in this code is value of [DEFAULT].control_exchange option,
-which is "openstack" by default
-
-DevStack Support
-----------------
-
-Pika driver is supported by DevStack. To enable it you should edit
-local.conf [localrc] section and add next there:
-
- enable_plugin pika https://git.openstack.org/openstack/devstack-plugin-pika
diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
deleted file mode 100644
index 65e7b48..0000000
--- a/oslo_messaging/_drivers/impl_pika.py
+++ /dev/null
@@ -1,366 +0,0 @@
-# Copyright 2011 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from debtcollector import deprecate
-from oslo_config import cfg
-from oslo_log import log as logging
-from oslo_utils import timeutils
-import pika_pool
-import tenacity
-
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers import common
-from oslo_messaging._drivers.pika_driver import (pika_connection_factory as
- pika_drv_conn_factory)
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr
-from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
-from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
-from oslo_messaging import exceptions
-
-LOG = logging.getLogger(__name__)
-
-pika_pool_opts = [
- cfg.IntOpt('pool_max_size', default=30,
- help="Maximum number of connections to keep queued."),
- cfg.IntOpt('pool_max_overflow', default=0,
- help="Maximum number of connections to create above "
- "`pool_max_size`."),
- cfg.IntOpt('pool_timeout', default=30,
- help="Default number of seconds to wait for a connections to "
- "available"),
- cfg.IntOpt('pool_recycle', default=600,
- help="Lifetime of a connection (since creation) in seconds "
- "or None for no recycling. Expired connections are "
- "closed on acquire."),
- cfg.IntOpt('pool_stale', default=60,
- help="Threshold at which inactive (since release) connections "
- "are considered stale in seconds or None for no "
- "staleness. Stale connections are closed on acquire.")
-]
-
-message_opts = [
- cfg.StrOpt('default_serializer_type', default='json',
- choices=('json', 'msgpack'),
- help="Default serialization mechanism for "
- "serializing/deserializing outgoing/incoming messages")
-]
-
-notification_opts = [
- cfg.BoolOpt('notification_persistence', default=False,
- help="Persist notification messages."),
- cfg.StrOpt('default_notification_exchange',
- default="${control_exchange}_notification",
- help="Exchange name for sending notifications"),
- cfg.IntOpt(
- 'notification_listener_prefetch_count', default=100,
- help="Max number of not acknowledged message which RabbitMQ can send "
- "to notification listener."
- ),
- cfg.IntOpt(
- 'default_notification_retry_attempts', default=-1,
- help="Reconnecting retry count in case of connectivity problem during "
- "sending notification, -1 means infinite retry."
- ),
- cfg.FloatOpt(
- 'notification_retry_delay', default=0.25,
- help="Reconnecting retry delay in case of connectivity problem during "
- "sending notification message"
- )
-]
-
-rpc_opts = [
- cfg.IntOpt('rpc_queue_expiration', default=60,
- help="Time to live for rpc queues without consumers in "
- "seconds."),
- cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc",
- help="Exchange name for sending RPC messages"),
- cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply",
- help="Exchange name for receiving RPC replies"),
- cfg.IntOpt(
- 'rpc_listener_prefetch_count', default=100,
- help="Max number of not acknowledged message which RabbitMQ can send "
- "to rpc listener."
- ),
- cfg.IntOpt(
- 'rpc_reply_listener_prefetch_count', default=100,
- help="Max number of not acknowledged message which RabbitMQ can send "
- "to rpc reply listener."
- ),
- cfg.IntOpt(
- 'rpc_reply_retry_attempts', default=-1,
- help="Reconnecting retry count in case of connectivity problem during "
- "sending reply. -1 means infinite retry during rpc_timeout"
- ),
- cfg.FloatOpt(
- 'rpc_reply_retry_delay', default=0.25,
- help="Reconnecting retry delay in case of connectivity problem during "
- "sending reply."
- ),
- cfg.IntOpt(
- 'default_rpc_retry_attempts', default=-1,
- help="Reconnecting retry count in case of connectivity problem during "
- "sending RPC message, -1 means infinite retry. If actual "
- "retry attempts in not 0 the rpc request could be processed more "
- "than one time"
- ),
- cfg.FloatOpt(
- 'rpc_retry_delay', default=0.25,
- help="Reconnecting retry delay in case of connectivity problem during "
- "sending RPC message"
- )
-]
-
-
-class PikaDriver(base.BaseDriver):
- """Pika Driver
-
- **Warning**: The ``pika`` driver has been deprecated and will be removed in
- a future release. It is recommended that all users of the ``pika`` driver
- transition to using the ``rabbit`` driver.
- """
-
- def __init__(self, conf, url, default_exchange=None,
- allowed_remote_exmods=None):
-
- deprecate("The pika driver is no longer maintained. It has been"
- " deprecated",
- message="It is recommended that all users of the pika driver"
- " transition to using the rabbit driver.",
- version="pike", removal_version="rocky")
-
- opt_group = cfg.OptGroup(name='oslo_messaging_pika',
- title='Pika driver options')
- conf.register_group(opt_group)
- conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group)
- conf.register_opts(pika_pool_opts, group=opt_group)
- conf.register_opts(message_opts, group=opt_group)
- conf.register_opts(rpc_opts, group=opt_group)
- conf.register_opts(notification_opts, group=opt_group)
- conf = common.ConfigOptsProxy(conf, url, opt_group.name)
-
- self._pika_engine = pika_drv_engine.PikaEngine(
- conf, url, default_exchange, allowed_remote_exmods
- )
- self._reply_listener = pika_drv_lstnr.RpcReplyPikaListener(
- self._pika_engine
- )
- super(PikaDriver, self).__init__(conf, url, default_exchange,
- allowed_remote_exmods)
-
- def require_features(self, requeue=False):
- pass
-
- def _declare_rpc_exchange(self, exchange, stopwatch):
- timeout = stopwatch.leftover(return_none=True)
- with (self._pika_engine.connection_without_confirmation_pool
- .acquire(timeout=timeout)) as conn:
- try:
- self._pika_engine.declare_exchange_by_channel(
- conn.channel,
- self._pika_engine.get_rpc_exchange_name(
- exchange
- ), "direct", False
- )
- except pika_pool.Timeout as e:
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired. {}.".format(
- str(e)
- )
- )
-
- def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
- retry=None):
- with timeutils.StopWatch(duration=timeout) as stopwatch:
- if retry is None:
- retry = self._pika_engine.default_rpc_retry_attempts
-
- exchange = self._pika_engine.get_rpc_exchange_name(
- target.exchange
- )
-
- def on_exception(ex):
- if isinstance(ex, pika_drv_exc.ExchangeNotFoundException):
- # it is desired to create exchange because if we sent to
- # exchange which is not exists, we get ChannelClosed
- # exception and need to reconnect
- try:
- self._declare_rpc_exchange(exchange, stopwatch)
- except pika_drv_exc.ConnectionException as e:
- LOG.warning("Problem during declaring exchange. %s", e)
- return True
- elif isinstance(ex, (pika_drv_exc.ConnectionException,
- exceptions.MessageDeliveryFailure)):
- LOG.warning("Problem during message sending. %s", ex)
- return True
- else:
- return False
-
- if retry:
- retrier = tenacity.retry(
- stop=(tenacity.stop_never if retry == -1 else
- tenacity.stop_after_attempt(retry)),
- retry=tenacity.retry_if_exception(on_exception),
- wait=tenacity.wait_fixed(self._pika_engine.rpc_retry_delay)
- )
- else:
- retrier = None
-
- if target.fanout:
- return self.cast_all_workers(
- exchange, target.topic, ctxt, message, stopwatch, retrier
- )
-
- routing_key = self._pika_engine.get_rpc_queue_name(
- target.topic, target.server, retrier is None
- )
-
- msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine,
- message, ctxt)
- try:
- reply = msg.send(
- exchange=exchange,
- routing_key=routing_key,
- reply_listener=(
- self._reply_listener if wait_for_reply else None
- ),
- stopwatch=stopwatch,
- retrier=retrier
- )
- except pika_drv_exc.ExchangeNotFoundException as ex:
- try:
- self._declare_rpc_exchange(exchange, stopwatch)
- except pika_drv_exc.ConnectionException as e:
- LOG.warning("Problem during declaring exchange. %s", e)
- raise ex
-
- if reply is not None:
- if reply.failure is not None:
- raise reply.failure
-
- return reply.result
-
- def cast_all_workers(self, exchange, topic, ctxt, message, stopwatch,
- retrier=None):
- msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
- ctxt)
- try:
- msg.send(
- exchange=exchange,
- routing_key=self._pika_engine.get_rpc_queue_name(
- topic, "all_workers", retrier is None
- ),
- mandatory=False,
- stopwatch=stopwatch,
- retrier=retrier
- )
- except pika_drv_exc.ExchangeNotFoundException:
- try:
- self._declare_rpc_exchange(exchange, stopwatch)
- except pika_drv_exc.ConnectionException as e:
- LOG.warning("Problem during declaring exchange. %s", e)
-
- def _declare_notification_queue_binding(
- self, target, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH):
- if stopwatch.expired():
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired."
- )
- try:
- timeout = stopwatch.leftover(return_none=True)
- with (self._pika_engine.connection_without_confirmation_pool
- .acquire)(timeout=timeout) as conn:
- self._pika_engine.declare_queue_binding_by_channel(
- conn.channel,
- exchange=(
- target.exchange or
- self._pika_engine.default_notification_exchange
- ),
- queue=target.topic,
- routing_key=target.topic,
- exchange_type='direct',
- queue_expiration=None,
- durable=self._pika_engine.notification_persistence,
- )
- except pika_pool.Timeout as e:
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired. {}.".format(str(e))
- )
-
- def send_notification(self, target, ctxt, message, version, retry=None):
- if retry is None:
- retry = self._pika_engine.default_notification_retry_attempts
-
- def on_exception(ex):
- if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException,
- pika_drv_exc.RoutingException)):
- LOG.warning("Problem during sending notification. %s", ex)
- try:
- self._declare_notification_queue_binding(target)
- except pika_drv_exc.ConnectionException as e:
- LOG.warning("Problem during declaring notification queue "
- "binding. %s", e)
- return True
- elif isinstance(ex, (pika_drv_exc.ConnectionException,
- pika_drv_exc.MessageRejectedException)):
- LOG.warning("Problem during sending notification. %s", ex)
- return True
- else:
- return False
-
- if retry:
- retrier = tenacity.retry(
- stop=(tenacity.stop_never if retry == -1 else
- tenacity.stop_after_attempt(retry)),
- retry=tenacity.retry_if_exception(on_exception),
- wait=tenacity.wait_fixed(
- self._pika_engine.notification_retry_delay
- )
- )
- else:
- retrier = None
-
- msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
- ctxt)
- return msg.send(
- exchange=(
- target.exchange or
- self._pika_engine.default_notification_exchange
- ),
- routing_key=target.topic,
- confirm=True,
- mandatory=True,
- persistent=self._pika_engine.notification_persistence,
- retrier=retrier
- )
-
- def listen(self, target, batch_size, batch_timeout):
- return pika_drv_poller.RpcServicePikaPoller(
- self._pika_engine, target, batch_size, batch_timeout,
- self._pika_engine.rpc_listener_prefetch_count
- )
-
- def listen_for_notifications(self, targets_and_priorities, pool,
- batch_size, batch_timeout):
- return pika_drv_poller.NotificationPikaPoller(
- self._pika_engine, targets_and_priorities, batch_size,
- batch_timeout,
- self._pika_engine.notification_listener_prefetch_count, pool
- )
-
- def cleanup(self):
- self._reply_listener.cleanup()
- self._pika_engine.cleanup()
diff --git a/oslo_messaging/_drivers/pika_driver/__init__.py b/oslo_messaging/_drivers/pika_driver/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/pika_driver/__init__.py
+++ /dev/null
diff --git a/oslo_messaging/_drivers/pika_driver/pika_commons.py b/oslo_messaging/_drivers/pika_driver/pika_commons.py
deleted file mode 100644
index f5e9086..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_commons.py
+++ /dev/null
@@ -1,40 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import select
-import socket
-
-from oslo_serialization.serializer import json_serializer
-from oslo_serialization.serializer import msgpack_serializer
-from oslo_utils import timeutils
-from pika import exceptions as pika_exceptions
-import six
-
-
-PIKA_CONNECTIVITY_ERRORS = (
- pika_exceptions.AMQPConnectionError,
- pika_exceptions.ConnectionClosed,
- pika_exceptions.ChannelClosed,
- socket.timeout,
- select.error
-)
-
-EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
-
-INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start()
-
-MESSAGE_SERIALIZERS = {
- 'application/json': json_serializer.JSONSerializer(),
- 'application/msgpack': msgpack_serializer.MessagePackSerializer()
-}
diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection.py b/oslo_messaging/_drivers/pika_driver/pika_connection.py
deleted file mode 100644
index f0dca5a..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_connection.py
+++ /dev/null
@@ -1,542 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import logging
-import os
-import threading
-
-import futurist
-from pika.adapters import select_connection
-from pika import exceptions as pika_exceptions
-from pika import spec as pika_spec
-
-from oslo_utils import eventletutils
-
-current_thread = eventletutils.fetch_current_thread_functor()
-
-LOG = logging.getLogger(__name__)
-
-
-class ThreadSafePikaConnection(object):
- def __init__(self, parameters=None,
- _impl_class=select_connection.SelectConnection):
- self.params = parameters
- self._connection_lock = threading.Lock()
- self._evt_closed = threading.Event()
- self._task_queue = collections.deque()
- self._pending_connection_futures = set()
-
- create_connection_future = self._register_pending_future()
-
- def on_open_error(conn, err):
- create_connection_future.set_exception(
- pika_exceptions.AMQPConnectionError(err)
- )
-
- self._impl = _impl_class(
- parameters=parameters,
- on_open_callback=create_connection_future.set_result,
- on_open_error_callback=on_open_error,
- on_close_callback=self._on_connection_close,
- stop_ioloop_on_close=False,
- )
- self._interrupt_pipein, self._interrupt_pipeout = os.pipe()
- self._impl.ioloop.add_handler(self._interrupt_pipein,
- self._impl.ioloop.read_interrupt,
- select_connection.READ)
-
- self._thread = threading.Thread(target=self._process_io)
- self._thread.daemon = True
- self._thread_id = None
- self._thread.start()
-
- create_connection_future.result()
-
- def _check_called_not_from_event_loop(self):
- if current_thread() == self._thread_id:
- raise RuntimeError("This call is not allowed from ioloop thread")
-
- def _execute_task(self, func, *args, **kwargs):
- if current_thread() == self._thread_id:
- return func(*args, **kwargs)
-
- future = futurist.Future()
- self._task_queue.append((func, args, kwargs, future))
-
- if self._evt_closed.is_set():
- self._notify_all_futures_connection_close()
- elif self._interrupt_pipeout is not None:
- os.write(self._interrupt_pipeout, b'X')
-
- return future.result()
-
- def _register_pending_future(self):
- future = futurist.Future()
- self._pending_connection_futures.add(future)
-
- def on_done_callback(fut):
- try:
- self._pending_connection_futures.remove(fut)
- except KeyError:
- pass
-
- future.add_done_callback(on_done_callback)
-
- if self._evt_closed.is_set():
- self._notify_all_futures_connection_close()
- return future
-
- def _notify_all_futures_connection_close(self):
- while self._task_queue:
- try:
- method_res_future = self._task_queue.pop()[3]
- except KeyError:
- break
- else:
- method_res_future.set_exception(
- pika_exceptions.ConnectionClosed()
- )
-
- while self._pending_connection_futures:
- try:
- pending_connection_future = (
- self._pending_connection_futures.pop()
- )
- except KeyError:
- break
- else:
- pending_connection_future.set_exception(
- pika_exceptions.ConnectionClosed()
- )
-
- def _on_connection_close(self, conn, reply_code, reply_text):
- self._evt_closed.set()
- self._notify_all_futures_connection_close()
- if self._interrupt_pipeout:
- os.close(self._interrupt_pipeout)
- os.close(self._interrupt_pipein)
-
- def add_on_close_callback(self, callback):
- return self._execute_task(self._impl.add_on_close_callback, callback)
-
- def _do_process_io(self):
- while self._task_queue:
- func, args, kwargs, future = self._task_queue.pop()
- try:
- res = func(*args, **kwargs)
- except BaseException as e:
- LOG.exception(e)
- future.set_exception(e)
- else:
- future.set_result(res)
-
- self._impl.ioloop.poll()
- self._impl.ioloop.process_timeouts()
-
- def _process_io(self):
- self._thread_id = current_thread()
- while not self._evt_closed.is_set():
- try:
- self._do_process_io()
- except BaseException:
- LOG.exception("Error during processing connection's IO")
-
- def close(self, *args, **kwargs):
- self._check_called_not_from_event_loop()
-
- res = self._execute_task(self._impl.close, *args, **kwargs)
-
- self._evt_closed.wait()
- self._thread.join()
- return res
-
- def channel(self, channel_number=None):
- self._check_called_not_from_event_loop()
-
- channel_opened_future = self._register_pending_future()
-
- impl_channel = self._execute_task(
- self._impl.channel,
- on_open_callback=channel_opened_future.set_result,
- channel_number=channel_number
- )
-
- # Create our proxy channel
- channel = ThreadSafePikaChannel(impl_channel, self)
-
- # Link implementation channel with our proxy channel
- impl_channel._set_cookie(channel)
-
- channel_opened_future.result()
- return channel
-
- def add_timeout(self, timeout, callback):
- return self._execute_task(self._impl.add_timeout, timeout, callback)
-
- def remove_timeout(self, timeout_id):
- return self._execute_task(self._impl.remove_timeout, timeout_id)
-
- @property
- def is_closed(self):
- return self._impl.is_closed
-
- @property
- def is_closing(self):
- return self._impl.is_closing
-
- @property
- def is_open(self):
- return self._impl.is_open
-
-
-class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
-
- def __init__(self, channel_impl, connection):
- self._impl = channel_impl
- self._connection = connection
-
- self._delivery_confirmation = False
-
- self._message_returned = False
- self._current_future = None
-
- self._evt_closed = threading.Event()
-
- self.add_on_close_callback(self._on_channel_close)
-
- def _execute_task(self, func, *args, **kwargs):
- return self._connection._execute_task(func, *args, **kwargs)
-
- def _on_channel_close(self, channel, reply_code, reply_text):
- self._evt_closed.set()
-
- if self._current_future:
- self._current_future.set_exception(
- pika_exceptions.ChannelClosed(reply_code, reply_text))
-
- def _on_message_confirmation(self, frame):
- self._current_future.set_result(frame)
-
- def add_on_close_callback(self, callback):
- self._execute_task(self._impl.add_on_close_callback, callback)
-
- def add_on_cancel_callback(self, callback):
- self._execute_task(self._impl.add_on_cancel_callback, callback)
-
- def __int__(self):
- return self.channel_number
-
- @property
- def channel_number(self):
- return self._impl.channel_number
-
- @property
- def is_closed(self):
- return self._impl.is_closed
-
- @property
- def is_closing(self):
- return self._impl.is_closing
-
- @property
- def is_open(self):
- return self._impl.is_open
-
- def close(self, reply_code=0, reply_text="Normal Shutdown"):
- self._impl.close(reply_code=reply_code, reply_text=reply_text)
- self._evt_closed.wait()
-
- def _check_called_not_from_event_loop(self):
- self._connection._check_called_not_from_event_loop()
-
- def flow(self, active):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(
- self._impl.flow, callback=self._current_future.set_result,
- active=active
- )
-
- return self._current_future.result()
-
- def basic_consume(self, # pylint: disable=R0913
- consumer_callback,
- queue,
- no_ack=False,
- exclusive=False,
- consumer_tag=None,
- arguments=None):
-
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(
- self._impl.add_callback, self._current_future.set_result,
- replies=[pika_spec.Basic.ConsumeOk], one_shot=True
- )
-
- self._impl.add_callback(self._current_future.set_result,
- replies=[pika_spec.Basic.ConsumeOk],
- one_shot=True)
- tag = self._execute_task(
- self._impl.basic_consume,
- consumer_callback=consumer_callback,
- queue=queue,
- no_ack=no_ack,
- exclusive=exclusive,
- consumer_tag=consumer_tag,
- arguments=arguments
- )
-
- self._current_future.result()
- return tag
-
- def basic_cancel(self, consumer_tag):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(
- self._impl.basic_cancel,
- callback=self._current_future.set_result,
- consumer_tag=consumer_tag,
- nowait=False)
- self._current_future.result()
-
- def basic_ack(self, delivery_tag=0, multiple=False):
- return self._execute_task(
- self._impl.basic_ack, delivery_tag=delivery_tag, multiple=multiple)
-
- def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
- return self._execute_task(
- self._impl.basic_nack, delivery_tag=delivery_tag,
- multiple=multiple, requeue=requeue
- )
-
- def publish(self, exchange, routing_key, body, # pylint: disable=R0913
- properties=None, mandatory=False, immediate=False):
-
- if self._delivery_confirmation:
- self._check_called_not_from_event_loop()
-
- # In publisher-acknowledgments mode
- self._message_returned = False
- self._current_future = futurist.Future()
-
- self._execute_task(self._impl.basic_publish,
- exchange=exchange,
- routing_key=routing_key,
- body=body,
- properties=properties,
- mandatory=mandatory,
- immediate=immediate)
-
- conf_method = self._current_future.result().method
-
- if isinstance(conf_method, pika_spec.Basic.Nack):
- raise pika_exceptions.NackError((None,))
- else:
- assert isinstance(conf_method, pika_spec.Basic.Ack), (
- conf_method)
-
- if self._message_returned:
- raise pika_exceptions.UnroutableError((None,))
- else:
- # In non-publisher-acknowledgments mode
- self._execute_task(self._impl.basic_publish,
- exchange=exchange,
- routing_key=routing_key,
- body=body,
- properties=properties,
- mandatory=mandatory,
- immediate=immediate)
-
- def basic_qos(self, prefetch_size=0, prefetch_count=0, all_channels=False):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.basic_qos,
- callback=self._current_future.set_result,
- prefetch_size=prefetch_size,
- prefetch_count=prefetch_count,
- all_channels=all_channels)
- self._current_future.result()
-
- def basic_recover(self, requeue=False):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(
- self._impl.basic_recover,
- callback=lambda: self._current_future.set_result(None),
- requeue=requeue
- )
- self._current_future.result()
-
- def basic_reject(self, delivery_tag=None, requeue=True):
- self._execute_task(self._impl.basic_reject,
- delivery_tag=delivery_tag,
- requeue=requeue)
-
- def _on_message_returned(self, *args, **kwargs):
- self._message_returned = True
-
- def confirm_delivery(self):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.add_callback,
- callback=self._current_future.set_result,
- replies=[pika_spec.Confirm.SelectOk],
- one_shot=True)
- self._execute_task(self._impl.confirm_delivery,
- callback=self._on_message_confirmation,
- nowait=False)
- self._current_future.result()
-
- self._delivery_confirmation = True
- self._execute_task(self._impl.add_on_return_callback,
- self._on_message_returned)
-
- def exchange_declare(self, exchange=None, # pylint: disable=R0913
- exchange_type='direct', passive=False, durable=False,
- auto_delete=False, internal=False,
- arguments=None, **kwargs):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.exchange_declare,
- callback=self._current_future.set_result,
- exchange=exchange,
- exchange_type=exchange_type,
- passive=passive,
- durable=durable,
- auto_delete=auto_delete,
- internal=internal,
- nowait=False,
- arguments=arguments,
- type=kwargs["type"] if kwargs else None)
-
- return self._current_future.result()
-
- def exchange_delete(self, exchange=None, if_unused=False):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.exchange_delete,
- callback=self._current_future.set_result,
- exchange=exchange,
- if_unused=if_unused,
- nowait=False)
-
- return self._current_future.result()
-
- def exchange_bind(self, destination=None, source=None, routing_key='',
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.exchange_bind,
- callback=self._current_future.set_result,
- destination=destination,
- source=source,
- routing_key=routing_key,
- nowait=False,
- arguments=arguments)
-
- return self._current_future.result()
-
- def exchange_unbind(self, destination=None, source=None, routing_key='',
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.exchange_unbind,
- callback=self._current_future.set_result,
- destination=destination,
- source=source,
- routing_key=routing_key,
- nowait=False,
- arguments=arguments)
-
- return self._current_future.result()
-
- def queue_declare(self, queue='', passive=False, durable=False,
- exclusive=False, auto_delete=False,
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_declare,
- callback=self._current_future.set_result,
- queue=queue,
- passive=passive,
- durable=durable,
- exclusive=exclusive,
- auto_delete=auto_delete,
- nowait=False,
- arguments=arguments)
-
- return self._current_future.result()
-
- def queue_delete(self, queue='', if_unused=False, if_empty=False):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_delete,
- callback=self._current_future.set_result,
- queue=queue,
- if_unused=if_unused,
- if_empty=if_empty,
- nowait=False)
-
- return self._current_future.result()
-
- def queue_purge(self, queue=''):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_purge,
- callback=self._current_future.set_result,
- queue=queue,
- nowait=False)
- return self._current_future.result()
-
- def queue_bind(self, queue, exchange, routing_key=None,
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_bind,
- callback=self._current_future.set_result,
- queue=queue,
- exchange=exchange,
- routing_key=routing_key,
- nowait=False,
- arguments=arguments)
- return self._current_future.result()
-
- def queue_unbind(self, queue='', exchange=None, routing_key=None,
- arguments=None):
- self._check_called_not_from_event_loop()
-
- self._current_future = futurist.Future()
- self._execute_task(self._impl.queue_unbind,
- callback=self._current_future.set_result,
- queue=queue,
- exchange=exchange,
- routing_key=routing_key,
- arguments=arguments)
- return self._current_future.result()
diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py b/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
deleted file mode 100644
index a78c55e..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
+++ /dev/null
@@ -1,307 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import random
-import socket
-import threading
-import time
-
-from oslo_config import cfg
-import pika
-from pika import credentials as pika_credentials
-
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_connection
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-
-LOG = logging.getLogger(__name__)
-
-# constant for setting tcp_user_timeout socket option
-# (it should be defined in 'select' module of standard library in future)
-TCP_USER_TIMEOUT = 18
-
-# constants for creating connection statistics
-HOST_CONNECTION_LAST_TRY_TIME = "last_try_time"
-HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time"
-
-pika_opts = [
- cfg.IntOpt('channel_max',
- help='Maximum number of channels to allow'),
- cfg.IntOpt('frame_max',
- help='The maximum byte size for an AMQP frame'),
- cfg.IntOpt('heartbeat_interval', default=3,
- help="How often to send heartbeats for consumer's connections"),
- cfg.BoolOpt('ssl',
- help='Enable SSL'),
- cfg.DictOpt('ssl_options',
- help='Arguments passed to ssl.wrap_socket'),
- cfg.FloatOpt('socket_timeout', default=0.25,
- help="Set socket timeout in seconds for connection's socket"),
- cfg.FloatOpt('tcp_user_timeout', default=0.25,
- help="Set TCP_USER_TIMEOUT in seconds for connection's "
- "socket"),
- cfg.FloatOpt('host_connection_reconnect_delay', default=0.25,
- help="Set delay for reconnection to some host which has "
- "connection error"),
- cfg.StrOpt('connection_factory', default="single",
- choices=["new", "single", "read_write"],
- help='Connection factory implementation')
-]
-
-
-class PikaConnectionFactory(object):
-
- def __init__(self, url, conf):
- self._url = url
- self._conf = conf
-
- self._connection_lock = threading.RLock()
-
- if not url.hosts:
- raise ValueError("You should provide at least one RabbitMQ host")
-
- # initializing connection parameters for configured RabbitMQ hosts
- self._common_pika_params = {
- 'virtual_host': url.virtual_host,
- 'channel_max': conf.oslo_messaging_pika.channel_max,
- 'frame_max': conf.oslo_messaging_pika.frame_max,
- 'ssl': conf.oslo_messaging_pika.ssl,
- 'ssl_options': conf.oslo_messaging_pika.ssl_options,
- 'socket_timeout': conf.oslo_messaging_pika.socket_timeout
- }
-
- self._host_list = url.hosts
- self._heartbeat_interval = conf.oslo_messaging_pika.heartbeat_interval
- self._host_connection_reconnect_delay = (
- conf.oslo_messaging_pika.host_connection_reconnect_delay
- )
- self._tcp_user_timeout = conf.oslo_messaging_pika.tcp_user_timeout
-
- self._connection_host_status = {}
-
- self._cur_connection_host_num = random.randint(
- 0, len(url.hosts) - 1
- )
-
- def cleanup(self):
- pass
-
- def create_connection(self, for_listening=False):
- """Create and return connection to any available host.
-
- :return: created connection
- :raise: ConnectionException if all hosts are not reachable
- """
-
- with self._connection_lock:
-
- host_count = len(self._host_list)
- connection_attempts = host_count
-
- while connection_attempts > 0:
- self._cur_connection_host_num += 1
- self._cur_connection_host_num %= host_count
- try:
- return self._create_host_connection(
- self._cur_connection_host_num, for_listening
- )
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
- LOG.warning("Can't establish connection to host. %s", e)
- except pika_drv_exc.HostConnectionNotAllowedException as e:
- LOG.warning("Connection to host is not allowed. %s", e)
-
- connection_attempts -= 1
-
- raise pika_drv_exc.EstablishConnectionException(
- "Can not establish connection to any configured RabbitMQ "
- "host: " + str(self._host_list)
- )
-
- def _set_tcp_user_timeout(self, s):
- if not self._tcp_user_timeout:
- return
- try:
- s.setsockopt(
- socket.IPPROTO_TCP, TCP_USER_TIMEOUT,
- int(self._tcp_user_timeout * 1000)
- )
- except socket.error:
- LOG.warning(
- "Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT."
- )
-
- def _create_host_connection(self, host_index, for_listening):
- """Create new connection to host #host_index
-
- :param host_index: Integer, number of host for connection establishing
- :param for_listening: Boolean, creates connection for listening
- if True
- :return: New connection
- """
- host = self._host_list[host_index]
-
- cur_time = time.time()
-
- host_connection_status = self._connection_host_status.get(host)
-
- if host_connection_status is None:
- host_connection_status = {
- HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0,
- HOST_CONNECTION_LAST_TRY_TIME: 0
- }
- self._connection_host_status[host] = host_connection_status
-
- last_success_time = host_connection_status[
- HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
- ]
- last_time = host_connection_status[
- HOST_CONNECTION_LAST_TRY_TIME
- ]
-
- # raise HostConnectionNotAllowedException if we tried to establish
- # connection in last 'host_connection_reconnect_delay' and got
- # failure
- if (last_time != last_success_time and
- cur_time - last_time <
- self._host_connection_reconnect_delay):
- raise pika_drv_exc.HostConnectionNotAllowedException(
- "Connection to host #{} is not allowed now because of "
- "previous failure".format(host_index)
- )
-
- try:
- connection = self._do_create_host_connection(
- host, for_listening
- )
- self._connection_host_status[host][
- HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
- ] = cur_time
-
- return connection
- finally:
- self._connection_host_status[host][
- HOST_CONNECTION_LAST_TRY_TIME
- ] = cur_time
-
- def _do_create_host_connection(self, host, for_listening):
- connection_params = pika.ConnectionParameters(
- host=host.hostname,
- port=host.port,
- credentials=pika_credentials.PlainCredentials(
- host.username, host.password
- ),
- heartbeat_interval=(
- self._heartbeat_interval if for_listening else None
- ),
- **self._common_pika_params
- )
- if for_listening:
- connection = pika_connection.ThreadSafePikaConnection(
- parameters=connection_params
- )
- else:
- connection = pika.BlockingConnection(
- parameters=connection_params
- )
- connection.params = connection_params
-
- self._set_tcp_user_timeout(connection._impl.socket)
- return connection
-
-
-class NotClosableConnection(object):
- def __init__(self, connection):
- self._connection = connection
-
- def __getattr__(self, item):
- return getattr(self._connection, item)
-
- def close(self):
- pass
-
-
-class SinglePikaConnectionFactory(PikaConnectionFactory):
- def __init__(self, url, conf):
- super(SinglePikaConnectionFactory, self).__init__(url, conf)
- self._connection = None
-
- def create_connection(self, for_listening=False):
- with self._connection_lock:
- if self._connection is None or not self._connection.is_open:
- self._connection = (
- super(SinglePikaConnectionFactory, self).create_connection(
- True
- )
- )
- return NotClosableConnection(self._connection)
-
- def cleanup(self):
- with self._connection_lock:
- if self._connection is not None and self._connection.is_open:
- try:
- self._connection.close()
- except Exception:
- LOG.warning(
- "Unexpected exception during connection closing",
- exc_info=True
- )
- self._connection = None
-
-
-class ReadWritePikaConnectionFactory(PikaConnectionFactory):
- def __init__(self, url, conf):
- super(ReadWritePikaConnectionFactory, self).__init__(url, conf)
- self._read_connection = None
- self._write_connection = None
-
- def create_connection(self, for_listening=False):
- with self._connection_lock:
- if for_listening:
- if (self._read_connection is None or
- not self._read_connection.is_open):
- self._read_connection = super(
- ReadWritePikaConnectionFactory, self
- ).create_connection(True)
- return NotClosableConnection(self._read_connection)
- else:
- if (self._write_connection is None or
- not self._write_connection.is_open):
- self._write_connection = super(
- ReadWritePikaConnectionFactory, self
- ).create_connection(True)
- return NotClosableConnection(self._write_connection)
-
- def cleanup(self):
- with self._connection_lock:
- if (self._read_connection is not None and
- self._read_connection.is_open):
- try:
- self._read_connection.close()
- except Exception:
- LOG.warning(
- "Unexpected exception during connection closing",
- exc_info=True
- )
- self._read_connection = None
-
- if (self._write_connection is not None and
- self._write_connection.is_open):
- try:
- self._write_connection.close()
- except Exception:
- LOG.warning(
- "Unexpected exception during connection closing",
- exc_info=True
- )
- self._write_connection = None
diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py
deleted file mode 100644
index 157c499..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_engine.py
+++ /dev/null
@@ -1,303 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import os
-import threading
-import uuid
-
-from oslo_utils import eventletutils
-import pika_pool
-from stevedore import driver
-
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-
-LOG = logging.getLogger(__name__)
-
-
-class _PooledConnectionWithConfirmations(pika_pool.Connection):
- """Derived from 'pika_pool.Connection' and extends its logic - adds
- 'confirm_delivery' call after channel creation to enable delivery
- confirmation for channel
- """
- @property
- def channel(self):
- if self.fairy.channel is None:
- self.fairy.channel = self.fairy.cxn.channel()
- self.fairy.channel.confirm_delivery()
- return self.fairy.channel
-
-
-class PikaEngine(object):
- """Used for shared functionality between other pika driver modules, like
- connection factory, connection pools, processing and holding configuration,
- etc.
- """
-
- def __init__(self, conf, url, default_exchange=None,
- allowed_remote_exmods=None):
- self.conf = conf
- self.url = url
-
- self._connection_factory_type = (
- self.conf.oslo_messaging_pika.connection_factory
- )
-
- self._connection_factory = None
- self._connection_without_confirmation_pool = None
- self._connection_with_confirmation_pool = None
- self._pid = None
- self._init_lock = threading.Lock()
-
- self.host_connection_reconnect_delay = (
- conf.oslo_messaging_pika.host_connection_reconnect_delay
- )
-
- # processing rpc options
- self.default_rpc_exchange = (
- conf.oslo_messaging_pika.default_rpc_exchange
- )
- self.rpc_reply_exchange = (
- conf.oslo_messaging_pika.rpc_reply_exchange
- )
-
- self.allowed_remote_exmods = [pika_drv_cmns.EXCEPTIONS_MODULE]
- if allowed_remote_exmods:
- self.allowed_remote_exmods.extend(allowed_remote_exmods)
-
- self.rpc_listener_prefetch_count = (
- conf.oslo_messaging_pika.rpc_listener_prefetch_count
- )
-
- self.default_rpc_retry_attempts = (
- conf.oslo_messaging_pika.default_rpc_retry_attempts
- )
-
- self.rpc_retry_delay = (
- conf.oslo_messaging_pika.rpc_retry_delay
- )
- if self.rpc_retry_delay < 0:
- raise ValueError("rpc_retry_delay should be non-negative integer")
-
- self.rpc_reply_listener_prefetch_count = (
- conf.oslo_messaging_pika.rpc_listener_prefetch_count
- )
-
- self.rpc_reply_retry_attempts = (
- conf.oslo_messaging_pika.rpc_reply_retry_attempts
- )
- self.rpc_reply_retry_delay = (
- conf.oslo_messaging_pika.rpc_reply_retry_delay
- )
- if self.rpc_reply_retry_delay < 0:
- raise ValueError("rpc_reply_retry_delay should be non-negative "
- "integer")
-
- self.rpc_queue_expiration = (
- self.conf.oslo_messaging_pika.rpc_queue_expiration
- )
-
- # processing notification options
- self.default_notification_exchange = (
- conf.oslo_messaging_pika.default_notification_exchange
- )
-
- self.notification_persistence = (
- conf.oslo_messaging_pika.notification_persistence
- )
-
- self.notification_listener_prefetch_count = (
- conf.oslo_messaging_pika.notification_listener_prefetch_count
- )
-
- self.default_notification_retry_attempts = (
- conf.oslo_messaging_pika.default_notification_retry_attempts
- )
- if self.default_notification_retry_attempts is None:
- raise ValueError("default_notification_retry_attempts should be "
- "an integer")
- self.notification_retry_delay = (
- conf.oslo_messaging_pika.notification_retry_delay
- )
- if (self.notification_retry_delay is None or
- self.notification_retry_delay < 0):
- raise ValueError("notification_retry_delay should be non-negative "
- "integer")
-
- self.default_content_type = (
- 'application/' + conf.oslo_messaging_pika.default_serializer_type
- )
-
- def _init_if_needed(self):
- cur_pid = os.getpid()
-
- if self._pid == cur_pid:
- return
-
- with self._init_lock:
- if self._pid == cur_pid:
- return
-
- if self._pid:
- LOG.warning("New pid is detected. Old: %s, new: %s. "
- "Cleaning up...", self._pid, cur_pid)
-
- # Note(dukhlov): we need to force select poller usage in case
- # when 'thread' module is monkey patched becase current
- # eventlet implementation does not support patching of
- # poll/epoll/kqueue
- if eventletutils.is_monkey_patched("thread"):
- from pika.adapters import select_connection
- select_connection.SELECT_TYPE = "select"
-
- mgr = driver.DriverManager(
- 'oslo.messaging.pika.connection_factory',
- self._connection_factory_type
- )
-
- self._connection_factory = mgr.driver(self.url, self.conf)
-
- # initializing 2 connection pools: 1st for connections without
- # confirmations, 2nd - with confirmations
- self._connection_without_confirmation_pool = pika_pool.QueuedPool(
- create=self.create_connection,
- max_size=self.conf.oslo_messaging_pika.pool_max_size,
- max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
- timeout=self.conf.oslo_messaging_pika.pool_timeout,
- recycle=self.conf.oslo_messaging_pika.pool_recycle,
- stale=self.conf.oslo_messaging_pika.pool_stale,
- )
-
- self._connection_with_confirmation_pool = pika_pool.QueuedPool(
- create=self.create_connection,
- max_size=self.conf.oslo_messaging_pika.pool_max_size,
- max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
- timeout=self.conf.oslo_messaging_pika.pool_timeout,
- recycle=self.conf.oslo_messaging_pika.pool_recycle,
- stale=self.conf.oslo_messaging_pika.pool_stale,
- )
-
- self._connection_with_confirmation_pool.Connection = (
- _PooledConnectionWithConfirmations
- )
-
- self._pid = cur_pid
-
- def create_connection(self, for_listening=False):
- self._init_if_needed()
- return self._connection_factory.create_connection(for_listening)
-
- @property
- def connection_without_confirmation_pool(self):
- self._init_if_needed()
- return self._connection_without_confirmation_pool
-
- @property
- def connection_with_confirmation_pool(self):
- self._init_if_needed()
- return self._connection_with_confirmation_pool
-
- def cleanup(self):
- if self._connection_factory:
- self._connection_factory.cleanup()
-
- def declare_exchange_by_channel(self, channel, exchange, exchange_type,
- durable):
- """Declare exchange using already created channel, if they don't exist
-
- :param channel: Channel for communication with RabbitMQ
- :param exchange: String, RabbitMQ exchange name
- :param exchange_type: String ('direct', 'topic' or 'fanout')
- exchange type for exchange to be declared
- :param durable: Boolean, creates durable exchange if true
- """
- try:
- channel.exchange_declare(
- exchange, exchange_type, auto_delete=True, durable=durable
- )
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
- raise pika_drv_exc.ConnectionException(
- "Connectivity problem detected during declaring exchange: "
- "exchange:{}, exchange_type: {}, durable: {}. {}".format(
- exchange, exchange_type, durable, str(e)
- )
- )
-
- def declare_queue_binding_by_channel(self, channel, exchange, queue,
- routing_key, exchange_type,
- queue_expiration, durable):
- """Declare exchange, queue and bind them using already created
- channel, if they don't exist
-
- :param channel: Channel for communication with RabbitMQ
- :param exchange: String, RabbitMQ exchange name
- :param queue: Sting, RabbitMQ queue name
- :param routing_key: Sting, RabbitMQ routing key for queue binding
- :param exchange_type: String ('direct', 'topic' or 'fanout')
- exchange type for exchange to be declared
- :param queue_expiration: Integer, time in seconds which queue will
- remain existing in RabbitMQ when there no consumers connected
- :param durable: Boolean, creates durable exchange and queue if true
- """
- try:
- channel.exchange_declare(
- exchange, exchange_type, auto_delete=True, durable=durable
- )
- arguments = {}
-
- if queue_expiration > 0:
- arguments['x-expires'] = queue_expiration * 1000
-
- channel.queue_declare(queue, durable=durable, arguments=arguments)
-
- channel.queue_bind(queue, exchange, routing_key)
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
- raise pika_drv_exc.ConnectionException(
- "Connectivity problem detected during declaring queue "
- "binding: exchange:{}, queue: {}, routing_key: {}, "
- "exchange_type: {}, queue_expiration: {}, "
- "durable: {}. {}".format(
- exchange, queue, routing_key, exchange_type,
- queue_expiration, durable, str(e)
- )
- )
-
- def get_rpc_exchange_name(self, exchange):
- """Returns RabbitMQ exchange name for given rpc request
-
- :param exchange: String, oslo.messaging target's exchange
-
- :return: String, RabbitMQ exchange name
- """
- return exchange or self.default_rpc_exchange
-
- @staticmethod
- def get_rpc_queue_name(topic, server, no_ack, worker=False):
- """Returns RabbitMQ queue name for given rpc request
-
- :param topic: String, oslo.messaging target's topic
- :param server: String, oslo.messaging target's server
- :param no_ack: Boolean, use message delivery with acknowledges or not
- :param worker: Boolean, use queue by single worker only or not
-
- :return: String, RabbitMQ queue name
- """
- queue_parts = ["no_ack" if no_ack else "with_ack", topic]
- if server is not None:
- queue_parts.append(server)
- if worker:
- queue_parts.append("worker")
- queue_parts.append(uuid.uuid4().hex)
- queue = '.'.join(queue_parts)
- return queue
diff --git a/oslo_messaging/_drivers/pika_driver/pika_exceptions.py b/oslo_messaging/_drivers/pika_driver/pika_exceptions.py
deleted file mode 100644
index c32d7e4..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_exceptions.py
+++ /dev/null
@@ -1,68 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging import exceptions
-
-
-class ExchangeNotFoundException(exceptions.MessageDeliveryFailure):
- """Is raised if specified exchange is not found in RabbitMQ."""
- pass
-
-
-class MessageRejectedException(exceptions.MessageDeliveryFailure):
- """Is raised if message which you are trying to send was nacked by RabbitMQ
- it may happen if RabbitMQ is not able to process message
- """
- pass
-
-
-class RoutingException(exceptions.MessageDeliveryFailure):
- """Is raised if message can not be delivered to any queue. Usually it means
- that any queue is not binded to given exchange with given routing key.
- Raised if 'mandatory' flag specified only
- """
- pass
-
-
-class ConnectionException(exceptions.MessagingException):
- """Is raised if some operation can not be performed due to connectivity
- problem
- """
- pass
-
-
-class TimeoutConnectionException(ConnectionException):
- """Is raised if socket timeout was expired during network interaction"""
- pass
-
-
-class EstablishConnectionException(ConnectionException):
- """Is raised if we have some problem during establishing connection
- procedure
- """
- pass
-
-
-class HostConnectionNotAllowedException(EstablishConnectionException):
- """Is raised in case of try to establish connection to temporary
- not allowed host (because of reconnection policy for example)
- """
- pass
-
-
-class UnsupportedDriverVersion(exceptions.MessagingException):
- """Is raised when message is received but was sent by different,
- not supported driver version
- """
- pass
diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py
deleted file mode 100644
index 1942fcc..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_listener.py
+++ /dev/null
@@ -1,123 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-import uuid
-
-from concurrent import futures
-from oslo_log import log as logging
-
-from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
-
-LOG = logging.getLogger(__name__)
-
-
-class RpcReplyPikaListener(object):
- """Provide functionality for listening RPC replies. Create and handle
- reply poller and coroutine for performing polling job
- """
-
- def __init__(self, pika_engine):
- super(RpcReplyPikaListener, self).__init__()
- self._pika_engine = pika_engine
-
- # preparing poller for listening replies
- self._reply_queue = None
-
- self._reply_poller = None
- self._reply_waiting_futures = {}
-
- self._reply_consumer_initialized = False
- self._reply_consumer_initialization_lock = threading.Lock()
- self._shutdown = False
-
- def get_reply_qname(self):
- """As result return reply queue name, shared for whole process,
- but before this check is RPC listener initialized or not and perform
- initialization if needed
-
- :return: String, queue name which hould be used for reply sending
- """
- if self._reply_consumer_initialized:
- return self._reply_queue
-
- with self._reply_consumer_initialization_lock:
- if self._reply_consumer_initialized:
- return self._reply_queue
-
- # generate reply queue name if needed
- if self._reply_queue is None:
- self._reply_queue = "reply.{}.{}.{}".format(
- self._pika_engine.conf.project,
- self._pika_engine.conf.prog, uuid.uuid4().hex
- )
-
- # initialize reply poller if needed
- if self._reply_poller is None:
- self._reply_poller = pika_drv_poller.RpcReplyPikaPoller(
- self._pika_engine, self._pika_engine.rpc_reply_exchange,
- self._reply_queue, 1, None,
- self._pika_engine.rpc_reply_listener_prefetch_count
- )
-
- self._reply_poller.start(self._on_incoming)
- self._reply_consumer_initialized = True
-
- return self._reply_queue
-
- def _on_incoming(self, incoming):
- """Reply polling job. Poll replies in infinite loop and notify
- registered features
- """
- for message in incoming:
- try:
- message.acknowledge()
- future = self._reply_waiting_futures.pop(
- message.msg_id, None
- )
- if future is not None:
- future.set_result(message)
- except Exception:
- LOG.exception("Unexpected exception during processing"
- "reply message")
-
- def register_reply_waiter(self, msg_id):
- """Register reply waiter. Should be called before message sending to
- the server
- :param msg_id: String, message_id of expected reply
- :return future: Future, container for expected reply to be returned
- over
- """
- future = futures.Future()
- self._reply_waiting_futures[msg_id] = future
- return future
-
- def unregister_reply_waiter(self, msg_id):
- """Unregister reply waiter. Should be called if client has not got
- reply and doesn't want to continue waiting (if timeout_expired for
- example)
- :param msg_id:
- """
- self._reply_waiting_futures.pop(msg_id, None)
-
- def cleanup(self):
- """Stop replies consuming and cleanup resources"""
- self._shutdown = True
-
- if self._reply_poller:
- self._reply_poller.stop()
- self._reply_poller.cleanup()
- self._reply_poller = None
-
- self._reply_queue = None
diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py
deleted file mode 100644
index 2f79f05..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_message.py
+++ /dev/null
@@ -1,618 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import socket
-import time
-import traceback
-import uuid
-
-from concurrent import futures
-from oslo_log import log as logging
-from oslo_utils import importutils
-from oslo_utils import timeutils
-from pika import exceptions as pika_exceptions
-from pika import spec as pika_spec
-import pika_pool
-import six
-import tenacity
-
-
-import oslo_messaging
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-from oslo_messaging import _utils as utils
-from oslo_messaging import exceptions
-
-
-LOG = logging.getLogger(__name__)
-
-_VERSION_HEADER = "version"
-_VERSION = "1.0"
-
-
-class RemoteExceptionMixin(object):
- """Used for constructing dynamic exception type during deserialization of
- remote exception. It defines unified '__init__' method signature and
- exception message format
- """
- def __init__(self, module, clazz, message, trace):
- """Store serialized data
- :param module: String, module name for importing original exception
- class of serialized remote exception
- :param clazz: String, original class name of serialized remote
- exception
- :param message: String, original message of serialized remote
- exception
- :param trace: String, original trace of serialized remote exception
- """
- self.module = module
- self.clazz = clazz
- self.message = message
- self.trace = trace
-
- self._str_msgs = message + "\n" + "\n".join(trace)
-
- def __str__(self):
- return self._str_msgs
-
-
-class PikaIncomingMessage(base.IncomingMessage):
- """Driver friendly adapter for received message. Extract message
- information from RabbitMQ message and provide access to it
- """
-
- def __init__(self, pika_engine, channel, method, properties, body):
- """Parse RabbitMQ message
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param channel: Channel, RabbitMQ channel which was used for
- this message delivery, used for sending ack back.
- If None - ack is not required
- :param method: Method, RabbitMQ message method
- :param properties: Properties, RabbitMQ message properties
- :param body: Bytes, RabbitMQ message body
- """
- headers = getattr(properties, "headers", {})
- version = headers.get(_VERSION_HEADER, None)
- if not utils.version_is_compatible(version, _VERSION):
- raise pika_drv_exc.UnsupportedDriverVersion(
- "Message's version: {} is not compatible with driver version: "
- "{}".format(version, _VERSION))
-
- self._pika_engine = pika_engine
- self._channel = channel
- self._delivery_tag = method.delivery_tag
-
- self._version = version
-
- self._content_type = properties.content_type
- self.unique_id = properties.message_id
-
- self.expiration_time = (
- None if properties.expiration is None else
- time.time() + float(properties.expiration) / 1000
- )
-
- try:
- serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[self._content_type]
- except KeyError:
- raise NotImplementedError(
- "Content-type['{}'] is not supported.".format(
- self._content_type
- )
- )
-
- message_dict = serializer.load_from_bytes(body)
-
- context_dict = {}
-
- for key in list(message_dict.keys()):
- key = six.text_type(key)
- if key.startswith('_$_'):
- value = message_dict.pop(key)
- context_dict[key[3:]] = value
-
- super(PikaIncomingMessage, self).__init__(context_dict, message_dict)
-
- def need_ack(self):
- return self._channel is not None
-
- def acknowledge(self):
- """Ack the message. Should be called by message processing logic when
- it considered as consumed (means that we don't need redelivery of this
- message anymore)
- """
- if self.need_ack():
- self._channel.basic_ack(delivery_tag=self._delivery_tag)
-
- def requeue(self):
- """Rollback the message. Should be called by message processing logic
- when it can not process the message right now and should be redelivered
- later if it is possible
- """
- if self.need_ack():
- return self._channel.basic_nack(delivery_tag=self._delivery_tag,
- requeue=True)
-
-
-class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
- """PikaIncomingMessage implementation for RPC messages. It expects
- extra RPC related fields in message body (msg_id and reply_q). Also 'reply'
- method added to allow consumer to send RPC reply back to the RPC client
- """
-
- def __init__(self, pika_engine, channel, method, properties, body):
- """Defines default values of msg_id and reply_q fields and just call
- super.__init__ method
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param channel: Channel, RabbitMQ channel which was used for
- this message delivery, used for sending ack back.
- If None - ack is not required
- :param method: Method, RabbitMQ message method
- :param properties: Properties, RabbitMQ message properties
- :param body: Bytes, RabbitMQ message body
- """
- super(RpcPikaIncomingMessage, self).__init__(
- pika_engine, channel, method, properties, body
- )
- self.reply_q = properties.reply_to
- self.msg_id = properties.correlation_id
-
- def reply(self, reply=None, failure=None):
- """Send back reply to the RPC client
- :param reply: Dictionary, reply. In case of exception should be None
- :param failure: Tuple, should be a sys.exc_info() tuple.
- Should be None if RPC request was successfully processed.
-
- :return RpcReplyPikaIncomingMessage: message with reply
- """
-
- if self.reply_q is None:
- return
-
- reply_outgoing_message = RpcReplyPikaOutgoingMessage(
- self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
- content_type=self._content_type,
- )
-
- def on_exception(ex):
- if isinstance(ex, pika_drv_exc.ConnectionException):
- LOG.warning(
- "Connectivity related problem during reply sending. %s",
- ex
- )
- return True
- else:
- return False
-
- if self._pika_engine.rpc_reply_retry_attempts:
- retrier = tenacity.retry(
- stop=(
- tenacity.stop_never
- if self._pika_engine.rpc_reply_retry_attempts == -1 else
- tenacity.stop_after_attempt(
- self._pika_engine.rpc_reply_retry_attempts
- )
- ),
- retry=tenacity.retry_if_exception(on_exception),
- wait=tenacity.wait_fixed(
- self._pika_engine.rpc_reply_retry_delay
- )
- )
- else:
- retrier = None
-
- try:
- timeout = (None if self.expiration_time is None else
- max(self.expiration_time - time.time(), 0))
- with timeutils.StopWatch(duration=timeout) as stopwatch:
- reply_outgoing_message.send(
- reply_q=self.reply_q,
- stopwatch=stopwatch,
- retrier=retrier
- )
- LOG.debug(
- "Message [id:'%s'] replied to '%s'.", self.msg_id, self.reply_q
- )
- except Exception:
- LOG.exception(
- "Message [id:'%s'] wasn't replied to : %s", self.msg_id,
- self.reply_q
- )
-
-
-class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
- """PikaIncomingMessage implementation for RPC reply messages. It expects
- extra RPC reply related fields in message body (result and failure).
- """
- def __init__(self, pika_engine, channel, method, properties, body):
- """Defines default values of result and failure fields, call
- super.__init__ method and then construct Exception object if failure is
- not None
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param channel: Channel, RabbitMQ channel which was used for
- this message delivery, used for sending ack back.
- If None - ack is not required
- :param method: Method, RabbitMQ message method
- :param properties: Properties, RabbitMQ message properties
- :param body: Bytes, RabbitMQ message body
- """
- super(RpcReplyPikaIncomingMessage, self).__init__(
- pika_engine, channel, method, properties, body
- )
-
- self.msg_id = properties.correlation_id
-
- self.result = self.message.get("s", None)
- self.failure = self.message.get("e", None)
-
- if self.failure is not None:
- trace = self.failure.get('t', [])
- message = self.failure.get('s', "")
- class_name = self.failure.get('c')
- module_name = self.failure.get('m')
-
- res_exc = None
-
- if module_name in pika_engine.allowed_remote_exmods:
- try:
- module = importutils.import_module(module_name)
- klass = getattr(module, class_name)
-
- ex_type = type(
- klass.__name__,
- (RemoteExceptionMixin, klass),
- {}
- )
-
- res_exc = ex_type(module_name, class_name, message, trace)
- except ImportError as e:
- LOG.warning(
- "Can not deserialize remote exception [module:%s, "
- "class:%s]. %s", module_name, class_name, e
- )
-
- # if we have not processed failure yet, use RemoteError class
- if res_exc is None:
- res_exc = oslo_messaging.RemoteError(
- class_name, message, trace
- )
- self.failure = res_exc
-
-
-class PikaOutgoingMessage(object):
- """Driver friendly adapter for sending message. Construct RabbitMQ message
- and send it
- """
-
- def __init__(self, pika_engine, message, context, content_type=None):
- """Parse RabbitMQ message
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param message: Dictionary, user's message fields
- :param context: Dictionary, request context's fields
- :param content_type: String, content-type header, defines serialization
- mechanism, if None default content-type from pika_engine is used
- """
-
- self._pika_engine = pika_engine
-
- self._content_type = (
- content_type if content_type is not None else
- self._pika_engine.default_content_type
- )
-
- try:
- self._serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[
- self._content_type
- ]
- except KeyError:
- raise NotImplementedError(
- "Content-type['{}'] is not supported.".format(
- self._content_type
- )
- )
-
- self.message = message
- self.context = context
-
- self.unique_id = uuid.uuid4().hex
-
- def _prepare_message_to_send(self):
- """Combine user's message fields an system fields (_unique_id,
- context's data etc)
- """
- msg = self.message.copy()
-
- if self.context:
- for key, value in self.context.items():
- key = six.text_type(key)
- msg['_$_' + key] = value
-
- props = pika_spec.BasicProperties(
- content_type=self._content_type,
- headers={_VERSION_HEADER: _VERSION},
- message_id=self.unique_id,
- )
- return msg, props
-
- @staticmethod
- def _publish(pool, exchange, routing_key, body, properties, mandatory,
- stopwatch):
- """Execute pika publish method using connection from connection pool
- Also this message catches all pika related exceptions and raise
- oslo.messaging specific exceptions
-
- :param pool: Pool, pika connection pool for connection choosing
- :param exchange: String, RabbitMQ exchange name for message sending
- :param routing_key: String, RabbitMQ routing key for message routing
- :param body: Bytes, RabbitMQ message payload
- :param properties: Properties, RabbitMQ message properties
- :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
- exception if it is not possible to deliver message to any queue)
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- """
- if stopwatch.expired():
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired."
- )
- try:
- timeout = stopwatch.leftover(return_none=True)
- with pool.acquire(timeout=timeout) as conn:
- if timeout is not None:
- properties.expiration = str(int(timeout * 1000))
- conn.channel.publish(
- exchange=exchange,
- routing_key=routing_key,
- body=body,
- properties=properties,
- mandatory=mandatory
- )
- except pika_exceptions.NackError as e:
- raise pika_drv_exc.MessageRejectedException(
- "Can not send message: [body: {}], properties: {}] to "
- "target [exchange: {}, routing_key: {}]. {}".format(
- body, properties, exchange, routing_key, str(e)
- )
- )
- except pika_exceptions.UnroutableError as e:
- raise pika_drv_exc.RoutingException(
- "Can not deliver message:[body:{}, properties: {}] to any "
- "queue using target: [exchange:{}, "
- "routing_key:{}]. {}".format(
- body, properties, exchange, routing_key, str(e)
- )
- )
- except pika_pool.Timeout as e:
- raise exceptions.MessagingTimeout(
- "Timeout for current operation was expired. {}".format(str(e))
- )
- except pika_pool.Connection.connectivity_errors as e:
- if (isinstance(e, pika_exceptions.ChannelClosed)
- and e.args and e.args[0] == 404):
- raise pika_drv_exc.ExchangeNotFoundException(
- "Attempt to send message to not existing exchange "
- "detected, message: [body:{}, properties: {}], target: "
- "[exchange:{}, routing_key:{}]. {}".format(
- body, properties, exchange, routing_key, str(e)
- )
- )
-
- raise pika_drv_exc.ConnectionException(
- "Connectivity problem detected during sending the message: "
- "[body:{}, properties: {}] to target: [exchange:{}, "
- "routing_key:{}]. {}".format(
- body, properties, exchange, routing_key, str(e)
- )
- )
- except socket.timeout:
- raise pika_drv_exc.TimeoutConnectionException(
- "Socket timeout exceeded."
- )
-
- def _do_send(self, exchange, routing_key, msg_dict, msg_props,
- confirm=True, mandatory=True, persistent=False,
- stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None):
- """Send prepared message with configured retrying
-
- :param exchange: String, RabbitMQ exchange name for message sending
- :param routing_key: String, RabbitMQ routing key for message routing
- :param msg_dict: Dictionary, message payload
- :param msg_props: Properties, message properties
- :param confirm: Boolean, enable publisher confirmation if True
- :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
- exception if it is not possible to deliver message to any queue)
- :param persistent: Boolean, send persistent message if True, works only
- for routing into durable queues
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- :param retrier: tenacity.Retrying, configured retrier object for
- sending message, if None no retrying is performed
- """
- msg_props.delivery_mode = 2 if persistent else 1
-
- pool = (self._pika_engine.connection_with_confirmation_pool
- if confirm else
- self._pika_engine.connection_without_confirmation_pool)
-
- body = self._serializer.dump_as_bytes(msg_dict)
-
- LOG.debug(
- "Sending message:[body:%s; properties: %s] to target: "
- "[exchange:%s; routing_key:%s]", body, msg_props, exchange,
- routing_key
- )
-
- publish = (self._publish if retrier is None else
- retrier(self._publish))
-
- return publish(pool, exchange, routing_key, body, msg_props,
- mandatory, stopwatch)
-
- def send(self, exchange, routing_key='', confirm=True, mandatory=True,
- persistent=False, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
- retrier=None):
- """Send message with configured retrying
-
- :param exchange: String, RabbitMQ exchange name for message sending
- :param routing_key: String, RabbitMQ routing key for message routing
- :param confirm: Boolean, enable publisher confirmation if True
- :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
- exception if it is not possible to deliver message to any queue)
- :param persistent: Boolean, send persistent message if True, works only
- for routing into durable queues
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- :param retrier: tenacity.Retrying, configured retrier object for
- sending message, if None no retrying is performed
- """
- msg_dict, msg_props = self._prepare_message_to_send()
-
- return self._do_send(exchange, routing_key, msg_dict, msg_props,
- confirm, mandatory, persistent,
- stopwatch, retrier)
-
-
-class RpcPikaOutgoingMessage(PikaOutgoingMessage):
- """PikaOutgoingMessage implementation for RPC messages. It adds
- possibility to wait and receive RPC reply
- """
- def __init__(self, pika_engine, message, context, content_type=None):
- super(RpcPikaOutgoingMessage, self).__init__(
- pika_engine, message, context, content_type
- )
- self.msg_id = None
- self.reply_q = None
-
- def send(self, exchange, routing_key, reply_listener=None,
- stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None):
- """Send RPC message with configured retrying
-
- :param exchange: String, RabbitMQ exchange name for message sending
- :param routing_key: String, RabbitMQ routing key for message routing
- :param reply_listener: RpcReplyPikaListener, listener for waiting
- reply. If None - return immediately without reply waiting
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- :param retrier: tenacity.Retrying, configured retrier object for
- sending message, if None no retrying is performed
- """
- msg_dict, msg_props = self._prepare_message_to_send()
-
- if reply_listener:
- self.msg_id = uuid.uuid4().hex
- msg_props.correlation_id = self.msg_id
- LOG.debug('MSG_ID is %s', self.msg_id)
-
- self.reply_q = reply_listener.get_reply_qname()
- msg_props.reply_to = self.reply_q
-
- future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
-
- self._do_send(
- exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
- msg_props=msg_props, confirm=True, mandatory=True,
- persistent=False, stopwatch=stopwatch, retrier=retrier
- )
-
- try:
- return future.result(stopwatch.leftover(return_none=True))
- except BaseException as e:
- reply_listener.unregister_reply_waiter(self.msg_id)
- if isinstance(e, futures.TimeoutError):
- e = exceptions.MessagingTimeout()
- raise e
- else:
- self._do_send(
- exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
- msg_props=msg_props, confirm=True, mandatory=True,
- persistent=False, stopwatch=stopwatch, retrier=retrier
- )
-
-
-class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
- """PikaOutgoingMessage implementation for RPC reply messages. It sets
- correlation_id AMQP property to link this reply with response
- """
- def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
- content_type=None):
- """Initialize with reply information for sending
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param msg_id: String, msg_id of RPC request, which waits for reply
- :param reply: Dictionary, reply. In case of exception should be None
- :param failure_info: Tuple, should be a sys.exc_info() tuple.
- Should be None if RPC request was successfully processed.
- :param content_type: String, content-type header, defines serialization
- mechanism, if None default content-type from pika_engine is used
- """
- self.msg_id = msg_id
-
- if failure_info is not None:
- ex_class = failure_info[0]
- ex = failure_info[1]
- tb = traceback.format_exception(*failure_info)
- if issubclass(ex_class, RemoteExceptionMixin):
- failure_data = {
- 'c': ex.clazz,
- 'm': ex.module,
- 's': ex.message,
- 't': tb
- }
- else:
- failure_data = {
- 'c': six.text_type(ex_class.__name__),
- 'm': six.text_type(ex_class.__module__),
- 's': six.text_type(ex),
- 't': tb
- }
-
- msg = {'e': failure_data}
- else:
- msg = {'s': reply}
-
- super(RpcReplyPikaOutgoingMessage, self).__init__(
- pika_engine, msg, None, content_type
- )
-
- def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
- retrier=None):
- """Send RPC message with configured retrying
-
- :param reply_q: String, queue name for sending reply
- :param stopwatch: StopWatch, stopwatch object for calculating
- allowed timeouts
- :param retrier: tenacity.Retrying, configured retrier object for
- sending message, if None no retrying is performed
- """
-
- msg_dict, msg_props = self._prepare_message_to_send()
- msg_props.correlation_id = self.msg_id
-
- self._do_send(
- exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q,
- msg_dict=msg_dict, msg_props=msg_props, confirm=True,
- mandatory=True, persistent=False, stopwatch=stopwatch,
- retrier=retrier
- )
diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py
deleted file mode 100644
index 1c770a3..0000000
--- a/oslo_messaging/_drivers/pika_driver/pika_poller.py
+++ /dev/null
@@ -1,538 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-
-from oslo_log import log as logging
-from oslo_service import loopingcall
-
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
-
-LOG = logging.getLogger(__name__)
-
-
-class PikaPoller(base.Listener):
- """Provides user friendly functionality for RabbitMQ message consuming,
- handles low level connectivity problems and restore connection if some
- connectivity related problem detected
- """
-
- def __init__(self, pika_engine, batch_size, batch_timeout, prefetch_count,
- incoming_message_class):
- """Initialize required fields
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param batch_size: desired number of messages passed to
- single on_incoming_callback call
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_count: Integer, maximum count of unacknowledged
- messages which RabbitMQ broker sends to this consumer
- :param incoming_message_class: PikaIncomingMessage, wrapper for
- consumed RabbitMQ message
- """
- super(PikaPoller, self).__init__(batch_size, batch_timeout,
- prefetch_count)
- self._pika_engine = pika_engine
- self._incoming_message_class = incoming_message_class
-
- self._connection = None
- self._channel = None
- self._recover_loopingcall = None
- self._lock = threading.RLock()
-
- self._cur_batch_buffer = None
- self._cur_batch_timeout_id = None
-
- self._started = False
- self._closing_connection_by_poller = False
-
- self._queues_to_consume = None
-
- def _on_connection_close(self, connection, reply_code, reply_text):
- self._deliver_cur_batch()
- if self._closing_connection_by_poller:
- return
- with self._lock:
- self._connection = None
- self._start_recover_consuming_task()
-
- def _on_channel_close(self, channel, reply_code, reply_text):
- if self._cur_batch_buffer:
- self._cur_batch_buffer = [
- message for message in self._cur_batch_buffer
- if not message.need_ack()
- ]
- if self._closing_connection_by_poller:
- return
- with self._lock:
- self._channel = None
- self._start_recover_consuming_task()
-
- def _on_consumer_cancel(self, method_frame):
- with self._lock:
- if self._queues_to_consume:
- consumer_tag = method_frame.method.consumer_tag
- for queue_info in self._queues_to_consume:
- if queue_info["consumer_tag"] == consumer_tag:
- queue_info["consumer_tag"] = None
-
- self._start_recover_consuming_task()
-
- def _on_message_no_ack_callback(self, unused, method, properties, body):
- """Is called by Pika when message was received from queue listened with
- no_ack=True mode
- """
- incoming_message = self._incoming_message_class(
- self._pika_engine, None, method, properties, body
- )
- self._on_incoming_message(incoming_message)
-
- def _on_message_with_ack_callback(self, unused, method, properties, body):
- """Is called by Pika when message was received from queue listened with
- no_ack=False mode
- """
- incoming_message = self._incoming_message_class(
- self._pika_engine, self._channel, method, properties, body
- )
- self._on_incoming_message(incoming_message)
-
- def _deliver_cur_batch(self):
- if self._cur_batch_timeout_id is not None:
- self._connection.remove_timeout(self._cur_batch_timeout_id)
- self._cur_batch_timeout_id = None
- if self._cur_batch_buffer:
- buf_to_send = self._cur_batch_buffer
- self._cur_batch_buffer = None
- try:
- self.on_incoming_callback(buf_to_send)
- except Exception:
- LOG.exception("Unexpected exception during incoming delivery")
-
- def _on_incoming_message(self, incoming_message):
- if self._cur_batch_buffer is None:
- self._cur_batch_buffer = [incoming_message]
- else:
- self._cur_batch_buffer.append(incoming_message)
-
- if len(self._cur_batch_buffer) >= self.batch_size:
- self._deliver_cur_batch()
- return
-
- if self._cur_batch_timeout_id is None:
- self._cur_batch_timeout_id = self._connection.add_timeout(
- self.batch_timeout, self._deliver_cur_batch)
-
- def _start_recover_consuming_task(self):
- """Start async job for checking connection to the broker."""
- if self._recover_loopingcall is None and self._started:
- self._recover_loopingcall = (
- loopingcall.DynamicLoopingCall(
- self._try_recover_consuming
- )
- )
- LOG.info("Starting recover consuming job for listener: %s", self)
- self._recover_loopingcall.start()
-
- def _try_recover_consuming(self):
- with self._lock:
- try:
- if self._started:
- self._start_or_recover_consuming()
- except pika_drv_exc.EstablishConnectionException as e:
- LOG.warning(
- "Problem during establishing connection for pika "
- "poller %s", e, exc_info=True
- )
- return self._pika_engine.host_connection_reconnect_delay
- except pika_drv_exc.ConnectionException as e:
- LOG.warning(
- "Connectivity exception during starting/recovering pika "
- "poller %s", e, exc_info=True
- )
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
- LOG.warning(
- "Connectivity exception during starting/recovering pika "
- "poller %s", e, exc_info=True
- )
- except BaseException:
- # NOTE (dukhlov): I preffer to use here BaseException because
- # if this method raise such exception LoopingCall stops
- # execution Probably it should never happen and Exception
- # should be enough but in case of programmer mistake it could
- # be and it is potentially hard to catch problem if we will
- # stop background task. It is better when it continue to work
- # and write a lot of LOG with this error
- LOG.exception("Unexpected exception during "
- "starting/recovering pika poller")
- else:
- self._recover_loopingcall = None
- LOG.info("Recover consuming job was finished for listener: %s",
- self)
- raise loopingcall.LoopingCallDone(True)
- return 0
-
- def _start_or_recover_consuming(self):
- """Performs reconnection to the broker. It is unsafe method for
- internal use only
- """
- if self._connection is None or not self._connection.is_open:
- self._connection = self._pika_engine.create_connection(
- for_listening=True
- )
- self._connection.add_on_close_callback(self._on_connection_close)
- self._channel = None
-
- if self._channel is None or not self._channel.is_open:
- if self._queues_to_consume:
- for queue_info in self._queues_to_consume:
- queue_info["consumer_tag"] = None
-
- self._channel = self._connection.channel()
- self._channel.add_on_close_callback(self._on_channel_close)
- self._channel.add_on_cancel_callback(self._on_consumer_cancel)
- self._channel.basic_qos(prefetch_count=self.prefetch_size)
-
- if self._queues_to_consume is None:
- self._queues_to_consume = self._declare_queue_binding()
-
- self._start_consuming()
-
- def _declare_queue_binding(self):
- """Is called by recovering connection logic if target RabbitMQ
- exchange and (or) queue do not exist. Should be overridden in child
- classes
-
- :return Dictionary: declared_queue_name -> no_ack_mode
- """
- raise NotImplementedError(
- "It is base class. Please declare exchanges and queues here"
- )
-
- def _start_consuming(self):
- """Is called by recovering connection logic for starting consumption
- of configured RabbitMQ queues
- """
-
- assert self._queues_to_consume is not None
-
- try:
- for queue_info in self._queues_to_consume:
- if queue_info["consumer_tag"] is not None:
- continue
- no_ack = queue_info["no_ack"]
-
- on_message_callback = (
- self._on_message_no_ack_callback if no_ack
- else self._on_message_with_ack_callback
- )
-
- queue_info["consumer_tag"] = self._channel.basic_consume(
- on_message_callback, queue_info["queue_name"],
- no_ack=no_ack
- )
- except Exception:
- self._queues_to_consume = None
- raise
-
- def _stop_consuming(self):
- """Is called by poller's stop logic for stopping consumption
- of configured RabbitMQ queues
- """
-
- assert self._queues_to_consume is not None
-
- for queue_info in self._queues_to_consume:
- consumer_tag = queue_info["consumer_tag"]
- if consumer_tag is not None:
- self._channel.basic_cancel(consumer_tag)
- queue_info["consumer_tag"] = None
-
- def start(self, on_incoming_callback):
- """Starts poller. Should be called before polling to allow message
- consuming
-
- :param on_incoming_callback: callback function to be executed when
- listener received messages. Messages should be processed and
- acked/nacked by callback
- """
- super(PikaPoller, self).start(on_incoming_callback)
-
- with self._lock:
- if self._started:
- return
- connected = False
- try:
- self._start_or_recover_consuming()
- except pika_drv_exc.EstablishConnectionException as exc:
- LOG.warning(
- "Can not establish connection during pika poller's "
- "start(). %s", exc, exc_info=True
- )
- except pika_drv_exc.ConnectionException as exc:
- LOG.warning(
- "Connectivity problem during pika poller's start(). %s",
- exc, exc_info=True
- )
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
- LOG.warning(
- "Connectivity problem during pika poller's start(). %s",
- exc, exc_info=True
- )
- else:
- connected = True
-
- self._started = True
- if not connected:
- self._start_recover_consuming_task()
-
- def stop(self):
- """Stops poller. Should be called when polling is not needed anymore to
- stop new message consuming. After that it is necessary to poll already
- prefetched messages
- """
- super(PikaPoller, self).stop()
-
- with self._lock:
- if not self._started:
- return
-
- if self._recover_loopingcall is not None:
- self._recover_loopingcall.stop()
- self._recover_loopingcall = None
-
- if (self._queues_to_consume and self._channel and
- self._channel.is_open):
- try:
- self._stop_consuming()
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
- LOG.warning(
- "Connectivity problem detected during consumer "
- "cancellation. %s", exc, exc_info=True
- )
- self._deliver_cur_batch()
- self._started = False
-
- def cleanup(self):
- """Cleanup allocated resources (channel, connection, etc)."""
- with self._lock:
- if self._connection and self._connection.is_open:
- try:
- self._closing_connection_by_poller = True
- self._connection.close()
- self._closing_connection_by_poller = False
- except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS:
- # expected errors
- pass
- except Exception:
- LOG.exception("Unexpected error during closing connection")
- finally:
- self._channel = None
- self._connection = None
-
-
-class RpcServicePikaPoller(PikaPoller):
- """PikaPoller implementation for polling RPC messages. Overrides base
- functionality according to RPC specific
- """
- def __init__(self, pika_engine, target, batch_size, batch_timeout,
- prefetch_count):
- """Adds target parameter for declaring RPC specific exchanges and
- queues
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param target: Target, oslo.messaging Target object which defines RPC
- endpoint
- :param batch_size: desired number of messages passed to
- single on_incoming_callback call
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_count: Integer, maximum count of unacknowledged
- messages which RabbitMQ broker sends to this consumer
- """
- self._target = target
-
- super(RpcServicePikaPoller, self).__init__(
- pika_engine, batch_size, batch_timeout, prefetch_count,
- pika_drv_msg.RpcPikaIncomingMessage
- )
-
- def _declare_queue_binding(self):
- """Overrides base method and perform declaration of RabbitMQ exchanges
- and queues which correspond to oslo.messaging RPC target
-
- :return Dictionary: declared_queue_name -> no_ack_mode
- """
- queue_expiration = self._pika_engine.rpc_queue_expiration
-
- exchange = self._pika_engine.get_rpc_exchange_name(
- self._target.exchange
- )
-
- queues_to_consume = []
-
- for no_ack in [True, False]:
- queue = self._pika_engine.get_rpc_queue_name(
- self._target.topic, None, no_ack
- )
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel, exchange=exchange, queue=queue,
- routing_key=queue, exchange_type='direct', durable=False,
- queue_expiration=queue_expiration
- )
- queues_to_consume.append(
- {"queue_name": queue, "no_ack": no_ack, "consumer_tag": None}
- )
-
- if self._target.server:
- server_queue = self._pika_engine.get_rpc_queue_name(
- self._target.topic, self._target.server, no_ack
- )
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel, exchange=exchange, durable=False,
- queue=server_queue, routing_key=server_queue,
- exchange_type='direct', queue_expiration=queue_expiration
- )
- queues_to_consume.append(
- {"queue_name": server_queue, "no_ack": no_ack,
- "consumer_tag": None}
- )
-
- worker_queue = self._pika_engine.get_rpc_queue_name(
- self._target.topic, self._target.server, no_ack, True
- )
- all_workers_routing_key = self._pika_engine.get_rpc_queue_name(
- self._target.topic, "all_workers", no_ack
- )
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel, exchange=exchange, durable=False,
- queue=worker_queue, routing_key=all_workers_routing_key,
- exchange_type='direct', queue_expiration=queue_expiration
- )
- queues_to_consume.append(
- {"queue_name": worker_queue, "no_ack": no_ack,
- "consumer_tag": None}
- )
-
- return queues_to_consume
-
-
-class RpcReplyPikaPoller(PikaPoller):
- """PikaPoller implementation for polling RPC reply messages. Overrides
- base functionality according to RPC reply specific
- """
- def __init__(self, pika_engine, exchange, queue, batch_size, batch_timeout,
- prefetch_count):
- """Adds exchange and queue parameter for declaring exchange and queue
- used for RPC reply delivery
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param exchange: String, exchange name used for RPC reply delivery
- :param queue: String, queue name used for RPC reply delivery
- :param batch_size: desired number of messages passed to
- single on_incoming_callback call
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_count: Integer, maximum count of unacknowledged
- messages which RabbitMQ broker sends to this consumer
- """
- self._exchange = exchange
- self._queue = queue
-
- super(RpcReplyPikaPoller, self).__init__(
- pika_engine, batch_size, batch_timeout, prefetch_count,
- pika_drv_msg.RpcReplyPikaIncomingMessage
- )
-
- def _declare_queue_binding(self):
- """Overrides base method and perform declaration of RabbitMQ exchange
- and queue used for RPC reply delivery
-
- :return Dictionary: declared_queue_name -> no_ack_mode
- """
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel,
- exchange=self._exchange, queue=self._queue,
- routing_key=self._queue, exchange_type='direct',
- queue_expiration=self._pika_engine.rpc_queue_expiration,
- durable=False
- )
-
- return [{"queue_name": self._queue, "no_ack": False,
- "consumer_tag": None}]
-
-
-class NotificationPikaPoller(PikaPoller):
- """PikaPoller implementation for polling Notification messages. Overrides
- base functionality according to Notification specific
- """
- def __init__(self, pika_engine, targets_and_priorities,
- batch_size, batch_timeout, prefetch_count, queue_name=None):
- """Adds targets_and_priorities and queue_name parameter
- for declaring exchanges and queues used for notification delivery
-
- :param pika_engine: PikaEngine, shared object with configuration and
- shared driver functionality
- :param targets_and_priorities: list of (target, priority), defines
- default queue names for corresponding notification types
- :param batch_size: desired number of messages passed to
- single on_incoming_callback call
- :param batch_timeout: defines how long should we wait for batch_size
- messages if we already have some messages waiting for processing
- :param prefetch_count: Integer, maximum count of unacknowledged
- messages which RabbitMQ broker sends to this consumer
- :param queue: String, alternative queue name used for this poller
- instead of default queue name
- """
- self._targets_and_priorities = targets_and_priorities
- self._queue_name = queue_name
-
- super(NotificationPikaPoller, self).__init__(
- pika_engine, batch_size, batch_timeout, prefetch_count,
- pika_drv_msg.PikaIncomingMessage
- )
-
- def _declare_queue_binding(self):
- """Overrides base method and perform declaration of RabbitMQ exchanges
- and queues used for notification delivery
-
- :return Dictionary: declared_queue_name -> no_ack_mode
- """
- queues_to_consume = []
- for target, priority in self._targets_and_priorities:
- routing_key = '%s.%s' % (target.topic, priority)
- queue = self._queue_name or routing_key
- self._pika_engine.declare_queue_binding_by_channel(
- channel=self._channel,
- exchange=(
- target.exchange or
- self._pika_engine.default_notification_exchange
- ),
- queue=queue,
- routing_key=routing_key,
- exchange_type='direct',
- queue_expiration=None,
- durable=self._pika_engine.notification_persistence,
- )
- queues_to_consume.append(
- {"queue_name": queue, "no_ack": False, "consumer_tag": None}
- )
-
- return queues_to_consume
diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py
index b766bb3..3181ae4 100644
--- a/oslo_messaging/opts.py
+++ b/oslo_messaging/opts.py
@@ -23,11 +23,9 @@ import itertools
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
from oslo_messaging._drivers import base as drivers_base
-from oslo_messaging._drivers import impl_pika
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers.impl_zmq import zmq_options
from oslo_messaging._drivers.kafka_driver import kafka_options
-from oslo_messaging._drivers.pika_driver import pika_connection_factory
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
from oslo_messaging.notify import notifier
from oslo_messaging.rpc import client
@@ -50,10 +48,7 @@ _opts = [
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(
- itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts,
- pika_connection_factory.pika_opts,
- impl_pika.pika_pool_opts, impl_pika.message_opts,
- impl_pika.notification_opts, impl_pika.rpc_opts))),
+ itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts))),
('oslo_messaging_kafka', kafka_options.KAFKA_OPTS),
]
diff --git a/oslo_messaging/tests/drivers/pika/__init__.py b/oslo_messaging/tests/drivers/pika/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/tests/drivers/pika/__init__.py
+++ /dev/null
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
deleted file mode 100644
index 3722f87..0000000
--- a/oslo_messaging/tests/drivers/pika/test_message.py
+++ /dev/null
@@ -1,615 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import functools
-import unittest
-
-from concurrent import futures
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-import pika
-from six.moves import mock
-
-import oslo_messaging
-from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
-from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
-
-
-class PikaIncomingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._channel = mock.Mock()
-
- self._delivery_tag = 12345
-
- self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
- self._properties = pika.BasicProperties(
- content_type="application/json",
- headers={"version": "1.0"},
- )
- self._body = (
- b'{"_$_key_context":"context_value",'
- b'"payload_key": "payload_value"}'
- )
-
- def test_message_body_parsing(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- def test_message_acknowledge(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- message.acknowledge()
-
- self.assertEqual(1, self._channel.basic_ack.call_count)
- self.assertEqual({"delivery_tag": self._delivery_tag},
- self._channel.basic_ack.call_args[1])
-
- def test_message_acknowledge_no_ack(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, None, self._method, self._properties,
- self._body
- )
-
- message.acknowledge()
-
- self.assertEqual(0, self._channel.basic_ack.call_count)
-
- def test_message_requeue(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- message.requeue()
-
- self.assertEqual(1, self._channel.basic_nack.call_count)
- self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True},
- self._channel.basic_nack.call_args[1])
-
- def test_message_requeue_no_ack(self):
- message = pika_drv_msg.PikaIncomingMessage(
- self._pika_engine, None, self._method, self._properties,
- self._body
- )
-
- message.requeue()
-
- self.assertEqual(0, self._channel.basic_nack.call_count)
-
-
-class RpcPikaIncomingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._pika_engine.rpc_reply_retry_attempts = 3
- self._pika_engine.rpc_reply_retry_delay = 0.25
-
- self._channel = mock.Mock()
-
- self._delivery_tag = 12345
-
- self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
- self._body = (
- b'{"_$_key_context":"context_value",'
- b'"payload_key":"payload_value"}'
- )
- self._properties = pika.BasicProperties(
- content_type="application/json",
- headers={"version": "1.0"},
- )
-
- def test_call_message_body_parsing(self):
- self._properties.correlation_id = 123456789
- self._properties.reply_to = "reply_queue"
-
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertEqual(123456789, message.msg_id)
- self.assertEqual("reply_queue", message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- def test_cast_message_body_parsing(self):
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertIsNone(message.msg_id)
- self.assertIsNone(message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- @mock.patch(("oslo_messaging._drivers.pika_driver.pika_message."
- "PikaOutgoingMessage.send"))
- def test_reply_for_cast_message(self, send_reply_mock):
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertIsNone(message.msg_id)
- self.assertIsNone(message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- message.reply(reply=object())
-
- self.assertEqual(0, send_reply_mock.call_count)
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
- "RpcReplyPikaOutgoingMessage")
- @mock.patch("tenacity.retry")
- def test_positive_reply_for_call_message(self,
- retry_mock,
- outgoing_message_mock):
- self._properties.correlation_id = 123456789
- self._properties.reply_to = "reply_queue"
-
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertEqual(123456789, message.msg_id)
- self.assertEqual("reply_queue", message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
- reply = "all_fine"
- message.reply(reply=reply)
-
- outgoing_message_mock.assert_called_once_with(
- self._pika_engine, 123456789, failure_info=None, reply='all_fine',
- content_type='application/json'
- )
- outgoing_message_mock().send.assert_called_once_with(
- reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
- )
- retry_mock.assert_called_once_with(
- stop=mock.ANY, retry=mock.ANY, wait=mock.ANY
- )
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
- "RpcReplyPikaOutgoingMessage")
- @mock.patch("tenacity.retry")
- def test_negative_reply_for_call_message(self,
- retry_mock,
- outgoing_message_mock):
- self._properties.correlation_id = 123456789
- self._properties.reply_to = "reply_queue"
-
- message = pika_drv_msg.RpcPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- self._body
- )
-
- self.assertEqual("context_value",
- message.ctxt.get("key_context", None))
- self.assertEqual(123456789, message.msg_id)
- self.assertEqual("reply_queue", message.reply_q)
-
- self.assertEqual("payload_value",
- message.message.get("payload_key", None))
-
- failure_info = object()
- message.reply(failure=failure_info)
-
- outgoing_message_mock.assert_called_once_with(
- self._pika_engine, 123456789,
- failure_info=failure_info,
- reply=None,
- content_type='application/json'
- )
- outgoing_message_mock().send.assert_called_once_with(
- reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
- )
- retry_mock.assert_called_once_with(
- stop=mock.ANY, retry=mock.ANY, wait=mock.ANY
- )
-
-
-class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._pika_engine.allowed_remote_exmods = [
- pika_drv_cmns.EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
- ]
-
- self._channel = mock.Mock()
-
- self._delivery_tag = 12345
-
- self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
-
- self._properties = pika.BasicProperties(
- content_type="application/json",
- headers={"version": "1.0"},
- correlation_id=123456789
- )
-
- def test_positive_reply_message_body_parsing(self):
-
- body = b'{"s": "all fine"}'
-
- message = pika_drv_msg.RpcReplyPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- body
- )
-
- self.assertEqual(123456789, message.msg_id)
- self.assertIsNone(message.failure)
- self.assertEqual("all fine", message.result)
-
- def test_negative_reply_message_body_parsing(self):
-
- body = (b'{'
- b' "e": {'
- b' "s": "Error message",'
- b' "t": ["TRACE HERE"],'
- b' "c": "MessagingException",'
- b' "m": "oslo_messaging.exceptions"'
- b' }'
- b'}')
-
- message = pika_drv_msg.RpcReplyPikaIncomingMessage(
- self._pika_engine, self._channel, self._method, self._properties,
- body
- )
-
- self.assertEqual(123456789, message.msg_id)
- self.assertIsNone(message.result)
- self.assertEqual(
- 'Error message\n'
- 'TRACE HERE',
- str(message.failure)
- )
- self.assertIsInstance(message.failure,
- oslo_messaging.MessagingException)
-
-
-class PikaOutgoingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.MagicMock()
- self._pika_engine.default_content_type = "application/json"
- self._exchange = "it is exchange"
- self._routing_key = "it is routing key"
- self._expiration = 1
- self._stopwatch = (
- timeutils.StopWatch(duration=self._expiration).start()
- )
- self._mandatory = object()
-
- self._message = {"msg_type": 1, "msg_str": "hello"}
- self._context = {"request_id": 555, "token": "it is a token"}
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_send_with_confirmation(self):
- message = pika_drv_msg.PikaOutgoingMessage(
- self._pika_engine, self._message, self._context
- )
-
- message.send(
- exchange=self._exchange,
- routing_key=self._routing_key,
- confirm=True,
- mandatory=self._mandatory,
- persistent=True,
- stopwatch=self._stopwatch,
- retrier=None
- )
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._exchange, mandatory=self._mandatory,
- properties=mock.ANY,
- routing_key=self._routing_key
- )
-
- body = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
-
- self.assertEqual(
- b'{"_$_request_id": 555, "_$_token": "it is a token", '
- b'"msg_str": "hello", "msg_type": 1}',
- body
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(2, props.delivery_mode)
- self.assertTrue(self._expiration * 1000 - float(props.expiration) <
- 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertTrue(props.message_id)
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_send_without_confirmation(self):
- message = pika_drv_msg.PikaOutgoingMessage(
- self._pika_engine, self._message, self._context
- )
-
- message.send(
- exchange=self._exchange,
- routing_key=self._routing_key,
- confirm=False,
- mandatory=self._mandatory,
- persistent=False,
- stopwatch=self._stopwatch,
- retrier=None
- )
-
- self._pika_engine.connection_without_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._exchange, mandatory=self._mandatory,
- properties=mock.ANY,
- routing_key=self._routing_key
- )
-
- body = self._pika_engine.connection_without_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
-
- self.assertEqual(
- b'{"_$_request_id": 555, "_$_token": "it is a token", '
- b'"msg_str": "hello", "msg_type": 1}',
- body
- )
-
- props = self._pika_engine.connection_without_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(self._expiration * 1000 - float(props.expiration)
- < 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertTrue(props.message_id)
-
-
-class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._exchange = "it is exchange"
- self._routing_key = "it is routing key"
-
- self._pika_engine = mock.MagicMock()
- self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
- self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
- self._pika_engine.default_content_type = "application/json"
-
- self._message = {"msg_type": 1, "msg_str": "hello"}
- self._context = {"request_id": 555, "token": "it is a token"}
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_send_cast_message(self):
- message = pika_drv_msg.RpcPikaOutgoingMessage(
- self._pika_engine, self._message, self._context
- )
-
- expiration = 1
- stopwatch = timeutils.StopWatch(duration=expiration).start()
-
- message.send(
- exchange=self._exchange,
- routing_key=self._routing_key,
- reply_listener=None,
- stopwatch=stopwatch,
- retrier=None
- )
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._exchange, mandatory=True,
- properties=mock.ANY,
- routing_key=self._routing_key
- )
-
- body = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
-
- self.assertEqual(
- b'{"_$_request_id": 555, "_$_token": "it is a token", '
- b'"msg_str": "hello", "msg_type": 1}',
- body
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertIsNone(props.correlation_id)
- self.assertIsNone(props.reply_to)
- self.assertTrue(props.message_id)
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_send_call_message(self):
- message = pika_drv_msg.RpcPikaOutgoingMessage(
- self._pika_engine, self._message, self._context
- )
-
- expiration = 1
- stopwatch = timeutils.StopWatch(duration=expiration).start()
-
- result = "it is a result"
- reply_queue_name = "reply_queue_name"
-
- future = futures.Future()
- future.set_result(result)
- reply_listener = mock.Mock()
- reply_listener.register_reply_waiter.return_value = future
- reply_listener.get_reply_qname.return_value = reply_queue_name
-
- res = message.send(
- exchange=self._exchange,
- routing_key=self._routing_key,
- reply_listener=reply_listener,
- stopwatch=stopwatch,
- retrier=None
- )
-
- self.assertEqual(result, res)
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._exchange, mandatory=True,
- properties=mock.ANY,
- routing_key=self._routing_key
- )
-
- body = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
-
- self.assertEqual(
- b'{"_$_request_id": 555, "_$_token": "it is a token", '
- b'"msg_str": "hello", "msg_type": 1}',
- body
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertEqual(message.msg_id, props.correlation_id)
- self.assertEqual(reply_queue_name, props.reply_to)
- self.assertTrue(props.message_id)
-
-
-class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
- def setUp(self):
- self._reply_q = "reply_queue_name"
-
- self._expiration = 1
- self._stopwatch = (
- timeutils.StopWatch(duration=self._expiration).start()
- )
-
- self._pika_engine = mock.MagicMock()
-
- self._rpc_reply_exchange = "rpc_reply_exchange"
- self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
- self._pika_engine.default_content_type = "application/json"
-
- self._msg_id = 12345567
-
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_success_message_send(self):
- message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
- self._pika_engine, self._msg_id, reply="all_fine"
- )
-
- message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None)
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=b'{"s": "all_fine"}',
- exchange=self._rpc_reply_exchange, mandatory=True,
- properties=mock.ANY,
- routing_key=self._reply_q
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(self._expiration * 1000 - float(props.expiration) <
- 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertEqual(message.msg_id, props.correlation_id)
- self.assertIsNone(props.reply_to)
- self.assertTrue(props.message_id)
-
- @mock.patch("traceback.format_exception", new=lambda x, y, z: z)
- @mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
- new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
- def test_failure_message_send(self):
- failure_info = (oslo_messaging.MessagingException,
- oslo_messaging.MessagingException("Error message"),
- ['It is a trace'])
-
- message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
- self._pika_engine, self._msg_id, failure_info=failure_info
- )
-
- message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None)
-
- self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.assert_called_once_with(
- body=mock.ANY,
- exchange=self._rpc_reply_exchange,
- mandatory=True,
- properties=mock.ANY,
- routing_key=self._reply_q
- )
-
- body = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["body"]
- self.assertEqual(
- b'{"e": {"c": "MessagingException", '
- b'"m": "oslo_messaging.exceptions", "s": "Error message", '
- b'"t": ["It is a trace"]}}',
- body
- )
-
- props = self._pika_engine.connection_with_confirmation_pool.acquire(
- ).__enter__().channel.publish.call_args[1]["properties"]
-
- self.assertEqual('application/json', props.content_type)
- self.assertEqual(1, props.delivery_mode)
- self.assertTrue(self._expiration * 1000 - float(props.expiration) <
- 100)
- self.assertEqual({'version': '1.0'}, props.headers)
- self.assertEqual(message.msg_id, props.correlation_id)
- self.assertIsNone(props.reply_to)
- self.assertTrue(props.message_id)
diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py
deleted file mode 100644
index 7957bec..0000000
--- a/oslo_messaging/tests/drivers/pika/test_poller.py
+++ /dev/null
@@ -1,482 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import threading
-import time
-import unittest
-
-from concurrent import futures
-from six.moves import mock
-
-from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
-from oslo_messaging._drivers.pika_driver import pika_poller
-
-
-class PikaPollerTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._poller_connection_mock = mock.Mock()
- self._poller_channel_mock = mock.Mock()
- self._poller_connection_mock.channel.return_value = (
- self._poller_channel_mock
- )
- self._pika_engine.create_connection.return_value = (
- self._poller_connection_mock
- )
-
- self._executor = futures.ThreadPoolExecutor(1)
-
- def timer_task(timeout, callback):
- time.sleep(timeout)
- callback()
-
- self._poller_connection_mock.add_timeout.side_effect = (
- lambda *args: self._executor.submit(timer_task, *args)
- )
-
- self._prefetch_count = 123
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
- "_declare_queue_binding")
- def test_start(self, declare_queue_binding_mock):
- poller = pika_poller.PikaPoller(
- self._pika_engine, 1, None, self._prefetch_count, None
- )
-
- poller.start(None)
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
- self.assertTrue(declare_queue_binding_mock.called)
-
- def test_start_when_connection_unavailable(self):
- poller = pika_poller.PikaPoller(
- self._pika_engine, 1, None, self._prefetch_count, None
- )
-
- self._pika_engine.create_connection.side_effect = (
- pika_drv_exc.EstablishConnectionException
- )
-
- # start() should not raise socket.timeout exception
- poller.start(None)
-
- # stop is needed to stop reconnection background job
- poller.stop()
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
- "_declare_queue_binding")
- def test_message_processing(self, declare_queue_binding_mock):
- res = []
-
- def on_incoming_callback(incoming):
- res.append(incoming)
-
- incoming_message_class_mock = mock.Mock()
- poller = pika_poller.PikaPoller(
- self._pika_engine, 1, None, self._prefetch_count,
- incoming_message_class=incoming_message_class_mock
- )
- unused = object()
- method = object()
- properties = object()
- body = object()
-
- poller.start(on_incoming_callback)
- poller._on_message_with_ack_callback(
- unused, method, properties, body
- )
-
- self.assertEqual(1, len(res))
-
- self.assertEqual([incoming_message_class_mock.return_value], res[0])
- incoming_message_class_mock.assert_called_once_with(
- self._pika_engine, self._poller_channel_mock, method, properties,
- body
- )
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- self.assertTrue(declare_queue_binding_mock.called)
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
- "_declare_queue_binding")
- def test_message_processing_batch(self, declare_queue_binding_mock):
- incoming_message_class_mock = mock.Mock()
-
- n = 10
- params = []
-
- res = []
-
- def on_incoming_callback(incoming):
- res.append(incoming)
-
- poller = pika_poller.PikaPoller(
- self._pika_engine, n, None, self._prefetch_count,
- incoming_message_class=incoming_message_class_mock
- )
-
- for i in range(n):
- params.append((object(), object(), object(), object()))
-
- poller.start(on_incoming_callback)
-
- for i in range(n):
- poller._on_message_with_ack_callback(
- *params[i]
- )
-
- self.assertEqual(1, len(res))
- self.assertEqual(10, len(res[0]))
- self.assertEqual(n, incoming_message_class_mock.call_count)
-
- for i in range(n):
- self.assertEqual(incoming_message_class_mock.return_value,
- res[0][i])
- self.assertEqual(
- (self._pika_engine, self._poller_channel_mock) + params[i][1:],
- incoming_message_class_mock.call_args_list[i][0]
- )
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- self.assertTrue(declare_queue_binding_mock.called)
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
- "_declare_queue_binding")
- def test_message_processing_batch_with_timeout(self,
- declare_queue_binding_mock):
- incoming_message_class_mock = mock.Mock()
-
- n = 10
- timeout = 1
-
- res = []
- evt = threading.Event()
-
- def on_incoming_callback(incoming):
- res.append(incoming)
- evt.set()
-
- poller = pika_poller.PikaPoller(
- self._pika_engine, n, timeout, self._prefetch_count,
- incoming_message_class=incoming_message_class_mock
- )
-
- params = []
-
- success_count = 5
-
- poller.start(on_incoming_callback)
-
- for i in range(n):
- params.append((object(), object(), object(), object()))
-
- for i in range(success_count):
- poller._on_message_with_ack_callback(
- *params[i]
- )
-
- self.assertTrue(evt.wait(timeout * 2))
-
- self.assertEqual(1, len(res))
- self.assertEqual(success_count, len(res[0]))
- self.assertEqual(success_count, incoming_message_class_mock.call_count)
-
- for i in range(success_count):
- self.assertEqual(incoming_message_class_mock.return_value,
- res[0][i])
- self.assertEqual(
- (self._pika_engine, self._poller_channel_mock) + params[i][1:],
- incoming_message_class_mock.call_args_list[i][0]
- )
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- self.assertTrue(declare_queue_binding_mock.called)
-
-
-class RpcServicePikaPollerTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._poller_connection_mock = mock.Mock()
- self._poller_channel_mock = mock.Mock()
- self._poller_connection_mock.channel.return_value = (
- self._poller_channel_mock
- )
- self._pika_engine.create_connection.return_value = (
- self._poller_connection_mock
- )
-
- self._pika_engine.get_rpc_queue_name.side_effect = (
- lambda topic, server, no_ack, worker=False:
- "_".join([topic, str(server), str(no_ack), str(worker)])
- )
-
- self._pika_engine.get_rpc_exchange_name.side_effect = (
- lambda exchange: exchange
- )
-
- self._prefetch_count = 123
- self._target = mock.Mock(exchange="exchange", topic="topic",
- server="server")
- self._pika_engine.rpc_queue_expiration = 12345
-
- @mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
- "RpcPikaIncomingMessage")
- def test_declare_rpc_queue_bindings(self, rpc_pika_incoming_message_mock):
- poller = pika_poller.RpcServicePikaPoller(
- self._pika_engine, self._target, 1, None,
- self._prefetch_count
- )
-
- poller.start(None)
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- declare_queue_binding_by_channel_mock = (
- self._pika_engine.declare_queue_binding_by_channel
- )
-
- self.assertEqual(
- 6, declare_queue_binding_by_channel_mock.call_count
- )
-
- declare_queue_binding_by_channel_mock.assert_has_calls((
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_None_True_False",
- queue_expiration=12345,
- routing_key="topic_None_True_False"
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_server_True_False",
- queue_expiration=12345,
- routing_key="topic_server_True_False"
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_server_True_True",
- queue_expiration=12345,
- routing_key="topic_all_workers_True_False"
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_None_False_False",
- queue_expiration=12345,
- routing_key="topic_None_False_False"
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_server_False_False",
- queue_expiration=12345,
- routing_key='topic_server_False_False'
- ),
- mock.call(
- channel=self._poller_channel_mock, durable=False,
- exchange="exchange",
- exchange_type='direct',
- queue="topic_server_False_True",
- queue_expiration=12345,
- routing_key='topic_all_workers_False_False'
- )
- ))
-
-
-class RpcReplyServicePikaPollerTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._poller_connection_mock = mock.Mock()
- self._poller_channel_mock = mock.Mock()
- self._poller_connection_mock.channel.return_value = (
- self._poller_channel_mock
- )
- self._pika_engine.create_connection.return_value = (
- self._poller_connection_mock
- )
-
- self._prefetch_count = 123
- self._exchange = "rpc_reply_exchange"
- self._queue = "rpc_reply_queue"
-
- self._pika_engine.rpc_reply_retry_delay = 12132543456
-
- self._pika_engine.rpc_queue_expiration = 12345
- self._pika_engine.rpc_reply_retry_attempts = 3
-
- def test_declare_rpc_reply_queue_binding(self):
- poller = pika_poller.RpcReplyPikaPoller(
- self._pika_engine, self._exchange, self._queue, 1, None,
- self._prefetch_count,
- )
-
- poller.start(None)
- poller.stop()
-
- declare_queue_binding_by_channel_mock = (
- self._pika_engine.declare_queue_binding_by_channel
- )
-
- self.assertEqual(
- 1, declare_queue_binding_by_channel_mock.call_count
- )
-
- declare_queue_binding_by_channel_mock.assert_called_once_with(
- channel=self._poller_channel_mock, durable=False,
- exchange='rpc_reply_exchange', exchange_type='direct',
- queue='rpc_reply_queue', queue_expiration=12345,
- routing_key='rpc_reply_queue'
- )
-
-
-class NotificationPikaPollerTestCase(unittest.TestCase):
- def setUp(self):
- self._pika_engine = mock.Mock()
- self._poller_connection_mock = mock.Mock()
- self._poller_channel_mock = mock.Mock()
- self._poller_connection_mock.channel.return_value = (
- self._poller_channel_mock
- )
- self._pika_engine.create_connection.return_value = (
- self._poller_connection_mock
- )
-
- self._prefetch_count = 123
- self._target_and_priorities = (
- (
- mock.Mock(exchange="exchange1", topic="topic1",
- server="server1"), 1
- ),
- (
- mock.Mock(exchange="exchange1", topic="topic1"), 2
- ),
- (
- mock.Mock(exchange="exchange2", topic="topic2",), 1
- ),
- )
- self._pika_engine.notification_persistence = object()
-
- def test_declare_notification_queue_bindings_default_queue(self):
- poller = pika_poller.NotificationPikaPoller(
- self._pika_engine, self._target_and_priorities, 1, None,
- self._prefetch_count, None
- )
-
- poller.start(None)
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- declare_queue_binding_by_channel_mock = (
- self._pika_engine.declare_queue_binding_by_channel
- )
-
- self.assertEqual(
- 3, declare_queue_binding_by_channel_mock.call_count
- )
-
- declare_queue_binding_by_channel_mock.assert_has_calls((
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange1",
- exchange_type='direct',
- queue="topic1.1",
- queue_expiration=None,
- routing_key="topic1.1"
- ),
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange1",
- exchange_type='direct',
- queue="topic1.2",
- queue_expiration=None,
- routing_key="topic1.2"
- ),
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange2",
- exchange_type='direct',
- queue="topic2.1",
- queue_expiration=None,
- routing_key="topic2.1"
- )
- ))
-
- def test_declare_notification_queue_bindings_custom_queue(self):
- poller = pika_poller.NotificationPikaPoller(
- self._pika_engine, self._target_and_priorities, 1, None,
- self._prefetch_count, "custom_queue_name"
- )
-
- poller.start(None)
-
- self.assertTrue(self._pika_engine.create_connection.called)
- self.assertTrue(self._poller_connection_mock.channel.called)
-
- declare_queue_binding_by_channel_mock = (
- self._pika_engine.declare_queue_binding_by_channel
- )
-
- self.assertEqual(
- 3, declare_queue_binding_by_channel_mock.call_count
- )
-
- declare_queue_binding_by_channel_mock.assert_has_calls((
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange1",
- exchange_type='direct',
- queue="custom_queue_name",
- queue_expiration=None,
- routing_key="topic1.1"
- ),
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange1",
- exchange_type='direct',
- queue="custom_queue_name",
- queue_expiration=None,
- routing_key="topic1.2"
- ),
- mock.call(
- channel=self._poller_channel_mock,
- durable=self._pika_engine.notification_persistence,
- exchange="exchange2",
- exchange_type='direct',
- queue="custom_queue_name",
- queue_expiration=None,
- routing_key="topic2.1"
- )
- ))
diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py
index ffeaf0a..db06d01 100644
--- a/oslo_messaging/tests/functional/test_rabbitmq.py
+++ b/oslo_messaging/tests/functional/test_rabbitmq.py
@@ -63,10 +63,6 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
self.n2 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME2"]
self.n3 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME3"]
- # NOTE(gdavoian): additional tweak for pika driver
- if self.driver == "pika":
- self.url = self.url.replace("rabbit", "pika")
-
# ensure connections will be establish to the first node
self.pifpaf.stop_node(self.n2)
self.pifpaf.stop_node(self.n3)
@@ -115,19 +111,6 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
return "callback done"
def _check_ports(self, port):
- getattr(self, '_check_ports_%s_driver' % self.driver)(port)
-
- def _check_ports_pika_driver(self, port):
- rpc_server = self.servers.servers[0].server
- # FIXME(sileht): Check other connections
- connections = [
- rpc_server.listener._connection
- ]
- for conn in connections:
- self.assertEqual(
- port, conn._impl.socket.getpeername()[1])
-
- def _check_ports_rabbit_driver(self, port):
rpc_server = self.servers.servers[0].server
connection_contexts = [
# rpc server
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index c381ebd..82bdbd9 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -304,8 +304,6 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
driver = os.environ.get("TRANSPORT_DRIVER")
if driver:
self.url = os.environ.get('PIFPAF_URL')
- if driver == "pika" and self.url:
- self.url = self.url.replace("rabbit://", "pika://")
else:
self.url = os.environ.get('TRANSPORT_URL')
diff --git a/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml b/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml
deleted file mode 100644
index e07f551..0000000
--- a/playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml
+++ /dev/null
@@ -1,15 +0,0 @@
-- hosts: primary
- tasks:
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/logs/**
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
diff --git a/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml b/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml
deleted file mode 100644
index 572f1d1..0000000
--- a/playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml
+++ /dev/null
@@ -1,42 +0,0 @@
-- hosts: all
- name: Autoconverted job legacy-oslo.messaging-src-dsvm-full-pika-default from old
- job gate-oslo.messaging-src-dsvm-full-pika-default-ubuntu-xenial-nv
- tasks:
-
- - name: Ensure legacy workspace directory
- file:
- path: '{{ ansible_user_dir }}/workspace'
- state: directory
-
- - shell:
- cmd: |
- set -e
- set -x
- cat > clonemap.yaml << EOF
- clonemap:
- - name: openstack-infra/devstack-gate
- dest: devstack-gate
- EOF
- /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
- git://git.openstack.org \
- openstack-infra/devstack-gate
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
-
- - shell:
- cmd: |
- set -e
- set -x
- export PYTHONUNBUFFERED=true
- export DEVSTACK_GATE_TEMPEST=1
- export DEVSTACK_GATE_TEMPEST_FULL=1
- export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
- export DEVSTACK_LOCAL_CONFIG="enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
- export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
-
- cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
- ./safe-devstack-vm-gate-wrap.sh
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
diff --git a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml b/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml
deleted file mode 100644
index dac8753..0000000
--- a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml
+++ /dev/null
@@ -1,80 +0,0 @@
-- hosts: primary
- tasks:
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*nose_results.html
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*testr_results.html.gz
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/.testrepository/tmp*
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*testrepository.subunit.gz
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}/tox'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/.tox/*/log/*
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/logs/**
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
diff --git a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml b/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml
deleted file mode 100644
index 8cad6ef..0000000
--- a/playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml
+++ /dev/null
@@ -1,79 +0,0 @@
-- hosts: all
- name: Autoconverted job legacy-oslo.messaging-telemetry-dsvm-integration-pika from
- old job gate-oslo.messaging-telemetry-dsvm-integration-pika-ubuntu-xenial-nv
- tasks:
-
- - name: Ensure legacy workspace directory
- file:
- path: '{{ ansible_user_dir }}/workspace'
- state: directory
-
- - shell:
- cmd: |
- set -e
- set -x
- cat > clonemap.yaml << EOF
- clonemap:
- - name: openstack-infra/devstack-gate
- dest: devstack-gate
- EOF
- /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
- git://git.openstack.org \
- openstack-infra/devstack-gate
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
-
- - shell:
- cmd: |
- set -e
- set -x
- export PYTHONUNBUFFERED=true
-
- export DEVSTACK_GATE_HEAT=1
- export DEVSTACK_GATE_NEUTRON=1
- export DEVSTACK_GATE_TEMPEST=1
- export DEVSTACK_GATE_EXERCISES=0
- export DEVSTACK_GATE_INSTALL_TESTONLY=1
-
- export PROJECTS="openstack/ceilometer $PROJECTS"
- export PROJECTS="openstack/aodh $PROJECTS"
- export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
-
- case "$ZUUL_BRANCH" in
- "stable/ocata")
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin gnocchi git://git.openstack.org/openstack/gnocchi"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://git.openstack.org/openstack/panko"
- export OVERRIDE_GNOCCHI_PROJECT_BRANCH="stable/3.1"
- export PROJECTS="openstack/panko $PROJECTS openstack/gnocchi"
- ;;
- *)
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://git.openstack.org/openstack/panko"
- export PROJECTS="openstack/panko $PROJECTS"
- ;;
- esac
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin ceilometer git://git.openstack.org/openstack/ceilometer"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin aodh git://git.openstack.org/openstack/aodh"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin heat git://git.openstack.org/openstack/heat"
-
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"CEILOMETER_BACKEND=gnocchi"
-
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"GNOCCHI_ARCHIVE_POLICY=high"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"CEILOMETER_PIPELINE_INTERVAL=5"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"GNOCCHI_STORAGE_BACKEND=file"
-
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
-
- export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
-
- function post_test_hook {
- cd /opt/stack/new/ceilometer/ceilometer/tests/integration/hooks/
- ./post_test_hook.sh
- }
- export -f post_test_hook
-
- cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
- ./safe-devstack-vm-gate-wrap.sh
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
diff --git a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml b/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml
deleted file mode 100644
index e07f551..0000000
--- a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml
+++ /dev/null
@@ -1,15 +0,0 @@
-- hosts: primary
- tasks:
-
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/logs/**
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
diff --git a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml b/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml
deleted file mode 100644
index 5dd6533..0000000
--- a/playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml
+++ /dev/null
@@ -1,45 +0,0 @@
-- hosts: all
- name: Autoconverted job legacy-tempest-neutron-dsvm-src-oslo.messaging-pika-default
- from old job gate-tempest-neutron-dsvm-src-oslo.messaging-pika-default-ubuntu-xenial-nv
- tasks:
-
- - name: Ensure legacy workspace directory
- file:
- path: '{{ ansible_user_dir }}/workspace'
- state: directory
-
- - shell:
- cmd: |
- set -e
- set -x
- cat > clonemap.yaml << EOF
- clonemap:
- - name: openstack-infra/devstack-gate
- dest: devstack-gate
- EOF
- /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
- git://git.openstack.org \
- openstack-infra/devstack-gate
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
-
- - shell:
- cmd: |
- set -e
- set -x
- export PYTHONUNBUFFERED=true
- export DEVSTACK_GATE_TEMPEST=1
- export DEVSTACK_GATE_TEMPEST_FULL=1
- export DEVSTACK_GATE_NEUTRON=1
-
- export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
- export DEVSTACK_LOCAL_CONFIG="enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
-
- export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
-
- cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
- ./safe-devstack-vm-gate-wrap.sh
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
diff --git a/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml b/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml
new file mode 100644
index 0000000..0d4e123
--- /dev/null
+++ b/releasenotes/notes/remove-pika-1bae204ced2521a3.yaml
@@ -0,0 +1,8 @@
+---
+prelude: >
+ The Pika-based driver for RabbitMQ has been removed.
+upgrade:
+ - |
+ Users of the Pika-based driver must change the prefix of all the
+ transport_url configuration options from "pika://..." to
+ "rabbit://..." to use the default kombu based RabbitMQ driver.
diff --git a/requirements.txt b/requirements.txt
index c6a0912..2055389 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -28,10 +28,8 @@ PyYAML>=3.12 # MIT
# we set the amqp version to ensure heartbeat works
amqp!=2.1.4,>=2.1.1 # BSD
kombu!=4.0.2,>=4.0.0 # BSD
-pika>=0.10.0 # BSD
-pika-pool>=0.1.3 # BSD
-# used by pika and zmq drivers
+# used by zmq driver
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD
tenacity>=3.2.1 # Apache-2.0
diff --git a/setup.cfg b/setup.cfg
index f5bda49..795191d 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -41,7 +41,6 @@ oslo.messaging.drivers =
# This is just for internal testing
fake = oslo_messaging._drivers.impl_fake:FakeDriver
- pika = oslo_messaging._drivers.impl_pika:PikaDriver
oslo.messaging.executors =
blocking = futurist:SynchronousExecutor
@@ -56,21 +55,6 @@ oslo.messaging.notify.drivers =
noop = oslo_messaging.notify._impl_noop:NoOpDriver
routing = oslo_messaging.notify._impl_routing:RoutingDriver
-oslo.messaging.pika.connection_factory =
- # Creates new connection for each create_connection call. Old-style behaviour
- # Uses a much more connections then single and read_write factories but still available as
- # an option
- new = oslo_messaging._drivers.pika_driver.pika_connection_factory:PikaConnectionFactory
-
- # Creates only one connection for transport and return it for each create connection call
- # it is default, but you can not use it with synchronous executor
- single = oslo_messaging._drivers.pika_driver.pika_connection_factory:SinglePikaConnectionFactory
-
- # Create two connections - one for listening and another one for sending and return them
- # for each create connection call depending on connection purpose. Creates one more connection
- # but you can use it with synchronous executor
- read_write = oslo_messaging._drivers.pika_driver.pika_connection_factory:ReadWritePikaConnectionFactory
-
oslo.messaging.zmq.matchmaker =
# Matchmakers for ZeroMQ
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy
diff --git a/tox.ini b/tox.ini
index 70666a2..66882de 100644
--- a/tox.ini
+++ b/tox.ini
@@ -44,12 +44,6 @@ setenv =
basepython = python3.5
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
-[testenv:py27-func-pika]
-setenv =
- {[testenv]setenv}
- TRANSPORT_DRIVER=pika
-commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
-
[testenv:py27-func-kafka]
setenv =
{[testenv]setenv}