summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Giusti <kgiusti@gmail.com>2016-08-17 09:30:09 -0400
committerKenneth Giusti <kgiusti@gmail.com>2016-08-17 09:30:09 -0400
commit39c3901b8c1709a197915f12767737298da1d80c (patch)
treec1ce310f95d77f2be7738af0ece8b69c6ea3fc08
parentba387dfd1c5a70f884a614aea8178404d35540bd (diff)
parent4eef58c5d8701d2190675098a1de560164182ffb (diff)
downloadoslo-messaging-39c3901b8c1709a197915f12767737298da1d80c.tar.gz
Merge remote-tracking branch 'origin/master' into resync-to-master
Change-Id: Id59697351a2f5d00d8e145e95bfe6e7a919b86f2
-rw-r--r--doc/source/zmq_driver.rst43
-rw-r--r--oslo_messaging/_cmd/zmq_proxy.py39
-rw-r--r--oslo_messaging/_drivers/amqp1_driver/opts.py2
-rw-r--r--oslo_messaging/_drivers/base.py7
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py13
-rw-r--r--[-rwxr-xr-x]oslo_messaging/_drivers/impl_pika.py0
-rw-r--r--oslo_messaging/_drivers/impl_rabbit.py82
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py91
-rw-r--r--oslo_messaging/_drivers/pika_driver/pika_connection_factory.py8
-rw-r--r--oslo_messaging/_drivers/pool.py65
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py108
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py91
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py106
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py58
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py199
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py68
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py162
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py52
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py111
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_client.py75
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py32
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py89
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py180
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_request.py15
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_response.py58
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py66
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py140
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py97
-rw-r--r--oslo_messaging/_drivers/zmq_driver/matchmaker/base.py32
-rw-r--r--oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py10
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/green_poller.py5
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/__init__.py (renamed from oslo_messaging/_drivers/zmq_driver/broker/__init__.py)0
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py (renamed from oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py)22
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py (renamed from oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py)19
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py (renamed from oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py)68
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py19
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py159
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py69
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py73
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py25
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py58
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/zmq_server.py21
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py79
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_address.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_async.py9
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_names.py17
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_options.py153
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_poller.py7
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_socket.py105
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_updater.py7
-rw-r--r--oslo_messaging/conffixture.py7
-rw-r--r--oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-error.po61
-rw-r--r--oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-warning.po24
-rw-r--r--oslo_messaging/notify/listener.py2
-rw-r--r--oslo_messaging/notify/notifier.py47
-rw-r--r--oslo_messaging/opts.py5
-rw-r--r--oslo_messaging/tests/drivers/pika/test_message.py136
-rw-r--r--oslo_messaging/tests/drivers/pika/test_poller.py40
-rw-r--r--oslo_messaging/tests/drivers/test_amqp_driver.py106
-rw-r--r--oslo_messaging/tests/drivers/test_impl_rabbit.py10
-rw-r--r--oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py10
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_impl_zmq.py4
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_pub_sub.py43
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py185
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_async.py29
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py116
-rw-r--r--oslo_messaging/tests/drivers/zmq/zmq_common.py25
-rw-r--r--oslo_messaging/tests/functional/notify/test_logger.py6
-rw-r--r--oslo_messaging/tests/functional/test_rabbitmq.py147
-rw-r--r--oslo_messaging/tests/functional/utils.py30
-rw-r--r--oslo_messaging/tests/functional/zmq/multiproc_utils.py3
-rw-r--r--oslo_messaging/tests/functional/zmq/test_startup.py6
-rw-r--r--oslo_messaging/tests/notify/test_dispatcher.py2
-rw-r--r--oslo_messaging/tests/notify/test_middleware.py78
-rw-r--r--oslo_messaging/tests/rpc/test_server.py26
-rw-r--r--oslo_messaging/tests/test_config_opts_proxy.py26
-rw-r--r--oslo_messaging/tests/test_opts.py3
-rw-r--r--oslo_messaging/tests/test_serializer.py10
-rw-r--r--oslo_messaging/tests/test_transport.py18
-rw-r--r--oslo_messaging/tests/utils.py3
-rw-r--r--releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml8
-rw-r--r--releasenotes/notes/option-rabbitmq-max_retries-has-been-deprecated-471f66a9e6d672a2.yaml5
-rw-r--r--releasenotes/source/locale/en_GB/LC_MESSAGES/releasenotes.po30
-rw-r--r--requirements.txt12
-rwxr-xr-xsetup-test-env-pika.sh32
-rwxr-xr-xsetup-test-env-rabbit.sh32
-rwxr-xr-xsetup-test-env-zmq-proxy.sh37
-rwxr-xr-xsetup-test-env-zmq-pub-sub.sh32
-rwxr-xr-xsetup-test-env-zmq.sh9
-rw-r--r--setup.cfg2
-rw-r--r--test-requirements.txt9
-rwxr-xr-xtools/simulator.py34
-rw-r--r--tox.ini44
94 files changed, 2835 insertions, 1681 deletions
diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst
index e73fdf9..bcc3d66 100644
--- a/doc/source/zmq_driver.rst
+++ b/doc/source/zmq_driver.rst
@@ -85,12 +85,14 @@ Configuration
Enabling (mandatory)
--------------------
-To enable the driver, in the section [DEFAULT] of the conf file,
-the 'rpc_backend' flag must be set to 'zmq' and the 'rpc_zmq_host' flag
+To enable the driver the 'transport_url' option must be set to 'zmq://'
+in the section [DEFAULT] of the conf file, the 'rpc_zmq_host' flag
must be set to the hostname of the current node. ::
[DEFAULT]
- rpc_backend = zmq
+ transport_url = "zmq://"
+
+ [oslo_messaging_zmq]
rpc_zmq_host = {hostname}
@@ -110,27 +112,17 @@ RedisMatchMaker: loads the hash table from a remote Redis server, supports
dynamic host/topic registrations, host expiration, and hooks for consuming
applications to acknowledge or neg-acknowledge topic.host service availability.
-To set the MatchMaker class, use option 'rpc_zmq_matchmaker' in [DEFAULT]. ::
-
- rpc_zmq_matchmaker = dummy
-
-or::
-
- rpc_zmq_matchmaker = redis
+For ZeroMQ driver Redis is configured in transport_url also. For using Redis
+specify the URL as follows::
-To specify the Redis server for RedisMatchMaker, use options in
-[matchmaker_redis] of each project. ::
-
- [matchmaker_redis]
- host = 127.0.0.1
- port = 6379
+ [DEFAULT]
+ transport_url = "zmq+redis://127.0.0.1:6379"
In order to cleanup redis storage from expired records (e.g. target listener
goes down) TTL may be applied for keys. Configure 'zmq_target_expire' option
which is 120 (seconds) by default. The option is related not specifically to
-redis so it is also defined in [DEFAULT] section. If option value is <= 0
-then keys don't expire and live forever in the storage.
-
+redis so it is also defined in [oslo_messaging_zmq] section. If option value
+is <= 0 then keys don't expire and live forever in the storage.
MatchMaker Data Source (mandatory)
----------------------------------
@@ -159,11 +151,10 @@ we use Sentinel solution and redis master-slave-slave configuration (if we have
3 controllers and run Redis on each of them).
To deploy redis with HA follow the `sentinel-install`_ instructions. From the
-messaging driver's side you will need to setup following configuration which
-is different from a single-node redis deployment ::
+messaging driver's side you will need to setup following configuration ::
- [matchmaker_redis]
- sentinel_hosts=host1:26379, host2:26379, host3:26379
+ [DEFAULT]
+ transport_url = "zmq+redis://host1:26379,host2:26379,host3:26379"
Restrict the number of TCP sockets on controller
@@ -174,7 +165,7 @@ controller node in directly connected configuration. To solve the issue
ROUTER proxy may be used.
In order to configure driver to use ROUTER proxy set up the 'use_router_proxy'
-option to true in [DEFAULT] section (false is set by default).
+option to true in [oslo_messaging_zmq] section (false is set by default).
For example::
@@ -198,7 +189,7 @@ direct DEALER/ROUTER unicast which is possible but less efficient and therefore
is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not
needed.
-This option can be set in [DEFAULT] section.
+This option can be set in [oslo_messaging_zmq] section.
For example::
@@ -218,7 +209,7 @@ All services bind to an IP address or Ethernet adapter. By default, all services
bind to '*', effectively binding to 0.0.0.0. This may be changed with the option
'rpc_zmq_bind_address' which accepts a wildcard, IP address, or Ethernet adapter.
-This configuration can be set in [DEFAULT] section.
+This configuration can be set in [oslo_messaging_zmq] section.
For example::
diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py
index 03ccea1..3126a41 100644
--- a/oslo_messaging/_cmd/zmq_proxy.py
+++ b/oslo_messaging/_cmd/zmq_proxy.py
@@ -17,15 +17,17 @@ import logging
from oslo_config import cfg
-from oslo_messaging._drivers import impl_zmq
-from oslo_messaging._drivers.zmq_driver.broker import zmq_proxy
-from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
-from oslo_messaging import server
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
+from oslo_messaging._drivers.zmq_driver import zmq_options
CONF = cfg.CONF
-CONF.register_opts(impl_zmq.zmq_opts)
-CONF.register_opts(server._pool_opts)
-CONF.rpc_zmq_native = True
+
+zmq_options.register_opts(CONF)
+
+opt_group = cfg.OptGroup(name='zmq_proxy_opts',
+ title='ZeroMQ proxy options')
+CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
USAGE = """ Usage: ./zmq-proxy.py [-h] [] ...
@@ -42,9 +44,20 @@ def main():
parser.add_argument('--config-file', dest='config_file', type=str,
help='Path to configuration file')
+
+ parser.add_argument('--host', dest='host', type=str,
+ help='Host FQDN for current proxy')
+ parser.add_argument('--frontend-port', dest='frontend_port', type=int,
+ help='Front-end ROUTER port number')
+ parser.add_argument('--backend-port', dest='backend_port', type=int,
+ help='Back-end ROUTER port number')
+ parser.add_argument('--publisher-port', dest='publisher_port', type=int,
+ help='Back-end PUBLISHER port number')
+
parser.add_argument('-d', '--debug', dest='debug', type=bool,
default=False,
help="Turn on DEBUG logging level instead of INFO")
+
args = parser.parse_args()
if args.config_file:
@@ -57,6 +70,18 @@ def main():
format='%(asctime)s %(name)s '
'%(levelname)-8s %(message)s')
+ if args.host:
+ CONF.zmq_proxy_opts.host = args.host
+ if args.frontend_port:
+ CONF.set_override('frontend_port', args.frontend_port,
+ group='zmq_proxy_opts')
+ if args.backend_port:
+ CONF.set_override('backend_port', args.backend_port,
+ group='zmq_proxy_opts')
+ if args.publisher_port:
+ CONF.set_override('publisher_port', args.publisher_port,
+ group='zmq_proxy_opts')
+
reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy)
try:
diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py
index 48c0a70..ed817ac 100644
--- a/oslo_messaging/_drivers/amqp1_driver/opts.py
+++ b/oslo_messaging/_drivers/amqp1_driver/opts.py
@@ -17,7 +17,6 @@ from oslo_config import cfg
amqp1_opts = [
cfg.StrOpt('container_name',
- default=None,
deprecated_group='amqp1',
help='Name for the AMQP container. must be globally unique.'
' Defaults to a generated UUID'),
@@ -48,7 +47,6 @@ amqp1_opts = [
help='Private key PEM file used to sign cert_file certificate'),
cfg.StrOpt('ssl_key_password',
- default=None,
deprecated_group='amqp1',
secret=True,
help='Password for decrypting ssl_key_file (if encrypted)'),
diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py
index d213209..24d703f 100644
--- a/oslo_messaging/_drivers/base.py
+++ b/oslo_messaging/_drivers/base.py
@@ -25,10 +25,13 @@ import six
from oslo_messaging import exceptions
base_opts = [
- cfg.IntOpt('rpc_conn_pool_size',
- default=30,
+ cfg.IntOpt('rpc_conn_pool_size', default=30,
deprecated_group='DEFAULT',
help='Size of RPC connection pool.'),
+ cfg.IntOpt('conn_pool_min_size', default=2,
+ help='The pool size limit for connections expiration policy'),
+ cfg.IntOpt('conn_pool_ttl', default=1200,
+ help='The time-to-live in sec of idle connections in the pool')
]
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index e7c3647..b448fcd 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -50,6 +50,12 @@ kafka_opts = [
cfg.IntOpt('pool_size', default=10,
help='Pool Size for Kafka Consumers'),
+
+ cfg.IntOpt('conn_pool_min_size', default=2,
+ help='The pool size limit for connections expiration policy'),
+
+ cfg.IntOpt('conn_pool_ttl', default=1200,
+ help='The time-to-live in sec of idle connections in the pool')
]
CONF = cfg.CONF
@@ -301,8 +307,13 @@ class KafkaDriver(base.BaseDriver):
super(KafkaDriver, self).__init__(
conf, url, default_exchange, allowed_remote_exmods)
+ # the pool configuration properties
+ max_size = self.conf.oslo_messaging_kafka.pool_size
+ min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
+ ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl
+
self.connection_pool = driver_pool.ConnectionPool(
- self.conf, self.conf.oslo_messaging_kafka.pool_size,
+ self.conf, max_size, min_size, ttl,
self._url, Connection)
self.listeners = []
diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
index 7ad0744..7ad0744 100755..100644
--- a/oslo_messaging/_drivers/impl_pika.py
+++ b/oslo_messaging/_drivers/impl_pika.py
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 9c44465..ffe8901 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -86,7 +86,7 @@ rabbit_opts = [
cfg.IntOpt('kombu_missing_consumer_retry_timeout',
deprecated_name="kombu_reconnect_timeout",
default=60,
- help='How long to wait a missing client beforce abandoning to '
+ help='How long to wait a missing client before abandoning to '
'send it its replies. This value should not be longer '
'than rpc_response_timeout.'),
cfg.StrOpt('kombu_failover_strategy',
@@ -156,6 +156,7 @@ rabbit_opts = [
'Default is 30 seconds.'),
cfg.IntOpt('rabbit_max_retries',
default=0,
+ deprecated_for_removal=True,
deprecated_group='DEFAULT',
help='Maximum number of RabbitMQ connection retries. '
'Default is 0 (infinite retry count).'),
@@ -294,8 +295,8 @@ class Consumer(object):
queue_arguments=self.queue_arguments)
try:
- LOG.trace('ConsumerBase.declare: '
- 'queue %s', self.queue_name)
+ LOG.debug('[%s] Queue.declare: %s',
+ conn.connection_id, self.queue_name)
self.queue.declare()
except conn.connection.channel_errors as exc:
# NOTE(jrosenboom): This exception may be triggered by a race
@@ -537,6 +538,10 @@ class Connection(object):
else:
self._connection_lock = DummyConnectionLock()
+ self.connection_id = str(uuid.uuid4())
+ self.name = "%s:%d:%s" % (os.path.basename(sys.argv[0]),
+ os.getpid(),
+ self.connection_id)
self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(),
login_method=self.login_method,
@@ -544,17 +549,21 @@ class Connection(object):
failover_strategy=self.kombu_failover_strategy,
transport_options={
'confirm_publish': True,
- 'client_properties': {'capabilities': {
- 'authentication_failure_close': True,
- 'connection.blocked': True,
- 'consumer_cancel_notify': True}},
+ 'client_properties': {
+ 'capabilities': {
+ 'authentication_failure_close': True,
+ 'connection.blocked': True,
+ 'consumer_cancel_notify': True
+ },
+ 'connection_name': self.name},
'on_blocked': self._on_connection_blocked,
'on_unblocked': self._on_connection_unblocked,
},
)
- LOG.debug('Connecting to AMQP server on %(hostname)s:%(port)s',
- self.connection.info())
+ LOG.debug('[%(connection_id)s] Connecting to AMQP server on'
+ ' %(hostname)s:%(port)s',
+ self._get_connection_info())
# NOTE(sileht): kombu recommend to run heartbeat_check every
# seconds, but we use a lock around the kombu connection
@@ -579,9 +588,10 @@ class Connection(object):
if purpose == rpc_common.PURPOSE_SEND:
self._heartbeat_start()
- LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s '
- 'via [%(transport)s] client',
- self.connection.info())
+ LOG.debug('[%(connection_id)s] Connected to AMQP server on '
+ '%(hostname)s:%(port)s via [%(transport)s] client with'
+ ' port %(client_port)s.',
+ self._get_connection_info())
# NOTE(sileht): value chosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
@@ -697,7 +707,8 @@ class Connection(object):
retry = None
def on_error(exc, interval):
- LOG.debug("Received recoverable error from kombu:",
+ LOG.debug("[%s] Received recoverable error from kombu:"
+ % self.connection_id,
exc_info=True)
recoverable_error_callback and recoverable_error_callback(exc)
@@ -707,16 +718,19 @@ class Connection(object):
else interval)
info = {'err_str': exc, 'sleep_time': interval}
- info.update(self.connection.info())
+ info.update(self._get_connection_info())
if 'Socket closed' in six.text_type(exc):
- LOG.error(_LE('AMQP server %(hostname)s:%(port)s closed'
+ LOG.error(_LE('[%(connection_id)s] AMQP server'
+ ' %(hostname)s:%(port)s closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
- LOG.error(_LE('AMQP server on %(hostname)s:%(port)s is '
- 'unreachable: %(err_str)s. Trying again in '
- '%(sleep_time)d seconds.'), info)
+ LOG.error(_LE('[%(connection_id)s] AMQP server on '
+ '%(hostname)s:%(port)s is unreachable: '
+ '%(err_str)s. Trying again in '
+ '%(sleep_time)d seconds. Client port: '
+ '%(client_port)s'), info)
# XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the
@@ -743,9 +757,10 @@ class Connection(object):
for consumer in self._consumers:
consumer.declare(self)
- LOG.info(_LI('Reconnected to AMQP server on '
- '%(hostname)s:%(port)s via [%(transport)s] client'),
- self.connection.info())
+ LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on '
+ '%(hostname)s:%(port)s via [%(transport)s] client'
+ 'with port %(client_port)s.'),
+ self._get_connection_info())
def execute_method(channel):
self._set_current_channel(channel)
@@ -829,6 +844,11 @@ class Connection(object):
"""Close/release this connection."""
self._heartbeat_stop()
if self.connection:
+ for consumer, tag in self._consumers.items():
+ if consumer.type == 'fanout':
+ LOG.debug('[connection close] Deleting fanout '
+ 'queue: %s ' % consumer.queue.name)
+ consumer.queue.delete()
self._set_current_channel(None)
self.connection.release()
self.connection = None
@@ -837,7 +857,6 @@ class Connection(object):
"""Reset a connection so it can be used again."""
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
-
with self._connection_lock:
try:
for consumer, tag in self._consumers.items():
@@ -885,7 +904,8 @@ class Connection(object):
sock = self.channel.connection.sock
except AttributeError as e:
# Level is set to debug because otherwise we would spam the logs
- LOG.debug('Failed to get socket attribute: %s' % str(e))
+ LOG.debug('[%s] Failed to get socket attribute: %s'
+ % (self.connection_id, str(e)))
else:
sock.settimeout(timeout)
# TCP_USER_TIMEOUT is not defined on Windows and Mac OS X
@@ -1141,6 +1161,15 @@ class Connection(object):
with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback)
+ def _get_connection_info(self):
+ info = self.connection.info()
+ client_port = None
+ if self.channel and hasattr(self.channel.connection, 'sock'):
+ client_port = self.channel.connection.sock.getsockname()[1]
+ info.update({'client_port': client_port,
+ 'connection_id': self.connection_id})
+ return info
+
def _publish(self, exchange, msg, routing_key=None, timeout=None):
"""Publish a message."""
@@ -1296,8 +1325,13 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
self.prefetch_size = (
conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
+ # the pool configuration properties
+ max_size = conf.oslo_messaging_rabbit.rpc_conn_pool_size
+ min_size = conf.oslo_messaging_rabbit.conn_pool_min_size
+ ttl = conf.oslo_messaging_rabbit.conn_pool_ttl
+
connection_pool = pool.ConnectionPool(
- conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
+ conf, max_size, min_size, ttl,
url, Connection)
super(RabbitDriver, self).__init__(
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 3829fa5..90c2c20 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -14,10 +14,8 @@
import logging
import os
-import socket
import threading
-from oslo_config import cfg
from stevedore import driver
from oslo_messaging._drivers import base
@@ -25,85 +23,14 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_client
from oslo_messaging._drivers.zmq_driver.server import zmq_server
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LE
-from oslo_messaging import server
RPCException = rpc_common.RPCException
-_MATCHMAKER_BACKENDS = ('redis', 'dummy')
-_MATCHMAKER_DEFAULT = 'redis'
LOG = logging.getLogger(__name__)
-zmq_opts = [
- cfg.StrOpt('rpc_zmq_bind_address', default='*',
- help='ZeroMQ bind address. Should be a wildcard (*), '
- 'an ethernet interface, or IP. '
- 'The "host" option should point or resolve to this '
- 'address.'),
-
- cfg.StrOpt('rpc_zmq_matchmaker', default=_MATCHMAKER_DEFAULT,
- choices=_MATCHMAKER_BACKENDS,
- help='MatchMaker driver.'),
-
- cfg.IntOpt('rpc_zmq_contexts', default=1,
- help='Number of ZeroMQ contexts, defaults to 1.'),
-
- cfg.IntOpt('rpc_zmq_topic_backlog',
- help='Maximum number of ingress messages to locally buffer '
- 'per topic. Default is unlimited.'),
-
- cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
- help='Directory for holding IPC sockets.'),
-
- cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
- sample_default='localhost',
- help='Name of this node. Must be a valid hostname, FQDN, or '
- 'IP address. Must match "host" option, if running Nova.'),
-
- cfg.IntOpt('rpc_cast_timeout', default=-1,
- help='Seconds to wait before a cast expires (TTL). '
- 'The default value of -1 specifies an infinite linger '
- 'period. The value of 0 specifies no linger period. '
- 'Pending messages shall be discarded immediately '
- 'when the socket is closed. Only supported by impl_zmq.'),
-
- cfg.IntOpt('rpc_poll_timeout', default=1,
- help='The default number of seconds that poll should wait. '
- 'Poll raises timeout exception when timeout expired.'),
-
- cfg.IntOpt('zmq_target_expire', default=300,
- help='Expiration timeout in seconds of a name service record '
- 'about existing target ( < 0 means no timeout).'),
-
- cfg.IntOpt('zmq_target_update', default=180,
- help='Update period in seconds of a name service record '
- 'about existing target.'),
-
- cfg.BoolOpt('use_pub_sub', default=True,
- help='Use PUB/SUB pattern for fanout methods. '
- 'PUB/SUB always uses proxy.'),
-
- cfg.BoolOpt('use_router_proxy', default=True,
- help='Use ROUTER remote proxy.'),
-
- cfg.PortOpt('rpc_zmq_min_port',
- default=49153,
- help='Minimal port number for random ports range.'),
-
- cfg.IntOpt('rpc_zmq_max_port',
- min=1,
- max=65536,
- default=65536,
- help='Maximal port number for random ports range.'),
-
- cfg.IntOpt('rpc_zmq_bind_port_retries',
- default=100,
- help='Number of retries to find free port number before '
- 'fail with ZMQBindError.')
-]
-
-
class LazyDriverItem(object):
def __init__(self, item_cls, *args, **kwargs):
@@ -169,9 +96,7 @@ class ZmqDriver(base.BaseDriver):
if zmq is None:
raise ImportError(_LE("ZeroMQ is not available!"))
- conf.register_opts(zmq_opts)
- conf.register_opts(server._pool_opts)
- conf.register_opts(base.base_opts)
+ zmq_options.register_opts(conf)
self.conf = conf
self.allowed_remote_exmods = allowed_remote_exmods
@@ -181,9 +106,11 @@ class ZmqDriver(base.BaseDriver):
).driver(self.conf, url=url)
client_cls = zmq_client.ZmqClientProxy
- if conf.use_pub_sub and not conf.use_router_proxy:
+ if conf.oslo_messaging_zmq.use_pub_sub and not \
+ conf.oslo_messaging_zmq.use_router_proxy:
client_cls = zmq_client.ZmqClientMixDirectPubSub
- elif not conf.use_pub_sub and not conf.use_router_proxy:
+ elif not conf.oslo_messaging_zmq.use_pub_sub and not \
+ conf.oslo_messaging_zmq.use_router_proxy:
client_cls = zmq_client.ZmqClientDirect
self.client = LazyDriverItem(
@@ -201,13 +128,13 @@ class ZmqDriver(base.BaseDriver):
zmq_transport, p, matchmaker_backend = url.transport.partition('+')
assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
if not matchmaker_backend:
- return self.conf.rpc_zmq_matchmaker
- elif matchmaker_backend not in _MATCHMAKER_BACKENDS:
+ return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker
+ elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
raise rpc_common.RPCException(
_LE("Incorrect matchmaker backend name %(backend_name)s!"
"Available names are: %(available_names)s") %
{"backend_name": matchmaker_backend,
- "available_names": _MATCHMAKER_BACKENDS})
+ "available_names": zmq_options.MATCHMAKER_BACKENDS})
return matchmaker_backend
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py b/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
index 3df0806..a78c55e 100644
--- a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
+++ b/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
@@ -36,15 +36,15 @@ HOST_CONNECTION_LAST_TRY_TIME = "last_try_time"
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time"
pika_opts = [
- cfg.IntOpt('channel_max', default=None,
+ cfg.IntOpt('channel_max',
help='Maximum number of channels to allow'),
- cfg.IntOpt('frame_max', default=None,
+ cfg.IntOpt('frame_max',
help='The maximum byte size for an AMQP frame'),
cfg.IntOpt('heartbeat_interval', default=3,
help="How often to send heartbeats for consumer's connections"),
- cfg.BoolOpt('ssl', default=None,
+ cfg.BoolOpt('ssl',
help='Enable SSL'),
- cfg.DictOpt('ssl_options', default=None,
+ cfg.DictOpt('ssl_options',
help='Arguments passed to ssl.wrap_socket'),
cfg.FloatOpt('socket_timeout', default=0.25,
help="Set socket timeout in seconds for connection's socket"),
diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py
index 1af1e31..681dbef 100644
--- a/oslo_messaging/_drivers/pool.py
+++ b/oslo_messaging/_drivers/pool.py
@@ -1,4 +1,3 @@
-
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -19,6 +18,7 @@ import sys
import threading
from oslo_log import log as logging
+from oslo_utils import timeutils
import six
from oslo_messaging._drivers import common
@@ -38,28 +38,48 @@ else:
@six.add_metaclass(abc.ABCMeta)
class Pool(object):
-
"""A thread-safe object pool.
Modelled after the eventlet.pools.Pool interface, but designed to be safe
when using native threads without the GIL.
Resizing is not supported.
+
"""
- def __init__(self, max_size=4):
+ def __init__(self, max_size=4, min_size=2, ttl=1200, on_expire=None):
super(Pool, self).__init__()
-
+ self._min_size = min_size
self._max_size = max_size
+ self._item_ttl = ttl
self._current_size = 0
self._cond = threading.Condition()
-
self._items = collections.deque()
+ self._on_expire = on_expire
+
+ def expire(self):
+ """Remove expired items from left (the oldest item) to
+ right (the newest item).
+ """
+ with self._cond:
+ while len(self._items) > self._min_size:
+ try:
+ ttl_watch, item = self._items.popleft()
+ if ttl_watch.expired():
+ self._on_expire and self._on_expire(item)
+ self._current_size -= 1
+ else:
+ self._items.appendleft((ttl_watch, item))
+ return
+ except IndexError:
+ break
def put(self, item):
"""Return an item to the pool."""
with self._cond:
- self._items.appendleft(item)
+ ttl_watch = timeutils.StopWatch(duration=self._item_ttl)
+ ttl_watch.start()
+ self._items.append((ttl_watch, item))
self._cond.notify()
def get(self):
@@ -70,7 +90,9 @@ class Pool(object):
with self._cond:
while True:
try:
- return self._items.popleft()
+ ttl_watch, item = self._items.pop()
+ self.expire()
+ return item
except IndexError:
pass
@@ -90,12 +112,12 @@ class Pool(object):
def iter_free(self):
"""Iterate over free items."""
- with self._cond:
- while True:
- try:
- yield self._items.popleft()
- except IndexError:
- break
+ while True:
+ try:
+ _, item = self._items.pop()
+ yield item
+ except IndexError:
+ raise StopIteration
@abc.abstractmethod
def create(self):
@@ -104,17 +126,20 @@ class Pool(object):
class ConnectionPool(Pool):
"""Class that implements a Pool of Connections."""
- def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
+
+ def __init__(self, conf, max_size, min_size, ttl, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
- super(ConnectionPool, self).__init__(rpc_conn_pool_size)
- self.reply_proxy = None
+ super(ConnectionPool, self).__init__(max_size, min_size, ttl,
+ self._on_expire)
+
+ def _on_expire(self, connection):
+ connection.close()
+ LOG.debug("Idle connection has expired and been closed."
+ " Pool size: %d" % len(self._items))
- # TODO(comstud): Timeout connections not used in a while
- def create(self, purpose=None):
- if purpose is None:
- purpose = common.PURPOSE_SEND
+ def create(self, purpose=common.PURPOSE_SEND):
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url, purpose)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py
deleted file mode 100644
index a8f2d71..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from concurrent import futures
-import futurist
-
-import oslo_messaging
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_reply_waiter
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_publisher_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LE
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class DealerCallPublisher(object):
- """Thread-safe CALL publisher
-
- Used as faster and thread-safe publisher for CALL
- instead of ReqPublisher.
- """
-
- def __init__(self, conf, matchmaker, sockets_manager, sender=None,
- reply_waiter=None):
- super(DealerCallPublisher, self).__init__()
- self.conf = conf
- self.matchmaker = matchmaker
- self.reply_waiter = reply_waiter or zmq_reply_waiter.ReplyWaiter(conf)
- self.sockets_manager = sockets_manager
- self.sender = sender or CallSender(self.sockets_manager,
- self.reply_waiter)
-
- def send_request(self, request):
- reply_future = self.sender.send_request(request)
- try:
- reply = reply_future.result(timeout=request.timeout)
- LOG.debug("Received reply %s", request.message_id)
- except AssertionError:
- LOG.error(_LE("Message format error in reply %s"),
- request.message_id)
- return None
- except futures.TimeoutError:
- raise oslo_messaging.MessagingTimeout(
- "Timeout %(tout)s seconds was reached for message %(id)s" %
- {"tout": request.timeout,
- "id": request.message_id})
- finally:
- self.reply_waiter.untrack_id(request.message_id)
-
- if reply.failure:
- raise rpc_common.deserialize_remote_exception(
- reply.failure,
- request.allowed_remote_exmods)
- else:
- return reply.reply_body
-
- def cleanup(self):
- self.reply_waiter.cleanup()
- self.sender.cleanup()
-
-
-class CallSender(zmq_publisher_base.QueuedSender):
-
- def __init__(self, sockets_manager, reply_waiter):
- super(CallSender, self).__init__(sockets_manager,
- self._do_send_request)
- assert reply_waiter, "Valid ReplyWaiter expected!"
- self.reply_waiter = reply_waiter
-
- def _do_send_request(self, socket, request):
- envelope = request.create_envelope()
- # DEALER socket specific envelope empty delimiter
- socket.send(b'', zmq.SNDMORE)
- socket.send_pyobj(envelope, zmq.SNDMORE)
- socket.send_pyobj(request)
-
- LOG.debug("Sent message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
- def send_request(self, request):
- reply_future = futurist.Future()
- self.reply_waiter.track_reply(reply_future, request.message_id)
- self.queue.put(request)
- return reply_future
-
- def _connect_socket(self, target):
- socket = self.outbound_sockets.get_socket(target)
- self.reply_waiter.poll_socket(socket)
- return socket
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py
deleted file mode 100644
index 0cd13ff..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from oslo_messaging._drivers.zmq_driver.client.publishers\
- import zmq_publisher_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class DealerPublisher(zmq_publisher_base.QueuedSender):
-
- def __init__(self, conf, matchmaker):
-
- def _send_message_data(socket, request):
- socket.send(b'', zmq.SNDMORE)
- socket.send_pyobj(request.create_envelope(), zmq.SNDMORE)
- socket.send_pyobj(request)
-
- LOG.debug("Sent message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
- def _do_send_request(socket, request):
- if request.msg_type in zmq_names.MULTISEND_TYPES:
- for _ in range(socket.connections_count()):
- _send_message_data(socket, request)
- else:
- _send_message_data(socket, request)
-
- sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
- super(DealerPublisher, self).__init__(sockets_manager,
- _do_send_request)
-
- def send_request(self, request):
- if request.msg_type == zmq_names.CALL_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
- super(DealerPublisher, self).send_request(request)
-
-
-class DealerPublisherAsync(object):
- """This simplified publisher is to be used with eventlet only.
- Eventlet takes care about zmq sockets sharing between green threads
- using queued lock.
- Use DealerPublisher for other concurrency models.
- """
-
- def __init__(self, conf, matchmaker):
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
-
- @staticmethod
- def _send_message_data(socket, request):
- socket.send(b'', zmq.SNDMORE)
- socket.send_pyobj(request.create_envelope(), zmq.SNDMORE)
- socket.send_pyobj(request)
-
- LOG.debug("Sent message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
- def send_request(self, request):
- if request.msg_type == zmq_names.CALL_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
- socket = self.sockets_manager.get_socket(request.target)
-
- if request.msg_type in zmq_names.MULTISEND_TYPES:
- for _ in range(socket.connections_count()):
- self._send_message_data(socket, request)
- else:
- self._send_message_data(socket, request)
-
- def cleanup(self):
- self.sockets_manager.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
new file mode 100644
index 0000000..3c232e3
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
@@ -0,0 +1,106 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+from concurrent import futures
+import logging
+
+import oslo_messaging
+from oslo_messaging._drivers import common as rpc_common
+from oslo_messaging._drivers.zmq_driver.client.publishers \
+ import zmq_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_response
+from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._i18n import _LE
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class DealerPublisherBase(zmq_publisher_base.PublisherBase):
+ """Abstract DEALER-publisher."""
+
+ def __init__(self, conf, matchmaker, sender, receiver):
+ sockets_manager = zmq_sockets_manager.SocketsManager(
+ conf, matchmaker, zmq.ROUTER, zmq.DEALER
+ )
+ super(DealerPublisherBase, self).__init__(sockets_manager, sender,
+ receiver)
+
+ @staticmethod
+ def _check_pattern(request, supported_pattern):
+ if request.msg_type != supported_pattern:
+ raise zmq_publisher_base.UnsupportedSendPattern(
+ zmq_names.message_type_str(request.msg_type)
+ )
+
+ @staticmethod
+ def _raise_timeout(request):
+ raise oslo_messaging.MessagingTimeout(
+ "Timeout %(tout)s seconds was reached for message %(msg_id)s" %
+ {"tout": request.timeout, "msg_id": request.message_id}
+ )
+
+ def _recv_reply(self, request):
+ reply_future = \
+ self.receiver.track_request(request)[zmq_names.REPLY_TYPE]
+
+ try:
+ _, reply = reply_future.result(timeout=request.timeout)
+ assert isinstance(reply, zmq_response.Reply), "Reply expected!"
+ except AssertionError:
+ LOG.error(_LE("Message format error in reply for %s"),
+ request.message_id)
+ return None
+ except futures.TimeoutError:
+ self._raise_timeout(request)
+ finally:
+ self.receiver.untrack_request(request)
+
+ if reply.failure:
+ raise rpc_common.deserialize_remote_exception(
+ reply.failure, request.allowed_remote_exmods
+ )
+ else:
+ return reply.reply_body
+
+ def send_call(self, request):
+ self._check_pattern(request, zmq_names.CALL_TYPE)
+
+ socket = self.connect_socket(request)
+ if not socket:
+ self._raise_timeout(request)
+
+ self.sender.send(socket, request)
+ self.receiver.register_socket(socket)
+ return self._recv_reply(request)
+
+ @abc.abstractmethod
+ def _send_non_blocking(self, request):
+ pass
+
+ def send_cast(self, request):
+ self._check_pattern(request, zmq_names.CAST_TYPE)
+ self._send_non_blocking(request)
+
+ def send_fanout(self, request):
+ self._check_pattern(request, zmq_names.CAST_FANOUT_TYPE)
+ self._send_non_blocking(request)
+
+ def send_notify(self, request):
+ self._check_pattern(request, zmq_names.NOTIFY_TYPE)
+ self._send_non_blocking(request)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
new file mode 100644
index 0000000..f6d3040
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
@@ -0,0 +1,58 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+import retrying
+
+from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
+ import zmq_dealer_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
+ """DEALER-publisher using direct connections."""
+
+ def __init__(self, conf, matchmaker):
+ sender = zmq_senders.RequestSenderDirect(conf)
+ if conf.oslo_messaging_zmq.rpc_use_acks:
+ receiver = zmq_receivers.AckAndReplyReceiverDirect(conf)
+ else:
+ receiver = zmq_receivers.ReplyReceiverDirect(conf)
+ super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender,
+ receiver)
+
+ def connect_socket(self, request):
+ try:
+ return self.sockets_manager.get_socket(request.target)
+ except retrying.RetryError:
+ return None
+
+ def _send_non_blocking(self, request):
+ socket = self.connect_socket(request)
+ if not socket:
+ return
+
+ if request.msg_type in zmq_names.MULTISEND_TYPES:
+ for _ in range(socket.connections_count()):
+ self.sender.send(socket, request)
+ else:
+ self.sender.send(socket, request)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index e446cde..e6862b8 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -13,168 +13,73 @@
# under the License.
import logging
-import six
-import time
+
+import retrying
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_dealer_call_publisher
-from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_reply_waiter
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_publisher_base
+ import zmq_dealer_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
+from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
-zmq = zmq_async.import_zmq()
-
LOG = logging.getLogger(__name__)
+zmq = zmq_async.import_zmq()
-class DealerPublisherProxy(object):
- """Used when publishing to a proxy. """
-
- def __init__(self, conf, matchmaker, socket_to_proxy):
- self.conf = conf
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
- self.socket = socket_to_proxy
- self.routing_table = RoutingTable(conf, matchmaker)
- self.connection_updater = PublisherConnectionUpdater(
- conf, matchmaker, self.socket)
-
- def send_request(self, request):
- if request.msg_type == zmq_names.CALL_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(
- request.msg_type)
-
- if self.conf.use_pub_sub:
- routing_key = self.routing_table.get_routable_host(request.target) \
- if request.msg_type in zmq_names.DIRECT_TYPES else \
- zmq_address.target_to_subscribe_filter(request.target)
- self._do_send_request(request, routing_key)
- else:
- routing_keys = self.routing_table.get_all_hosts(request.target)
- for routing_key in routing_keys:
- self._do_send_request(request, routing_key)
-
- def _do_send_request(self, request, routing_key):
- self.socket.send(b'', zmq.SNDMORE)
- self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
- self.socket.send(six.b(routing_key), zmq.SNDMORE)
- self.socket.send(six.b(request.message_id), zmq.SNDMORE)
- self.socket.send_pyobj(request.context, zmq.SNDMORE)
- self.socket.send_pyobj(request.message)
-
- LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
- "a target %(target)s",
- {"message": request.message_id,
- "target": request.target,
- "addr": list(self.socket.connections)})
-
- def cleanup(self):
- self.socket.close()
-
-
-class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher):
-
- def __init__(self, conf, matchmaker, sockets_manager):
- reply_waiter = ReplyWaiterProxy(conf)
- sender = CallSenderProxy(conf, matchmaker, sockets_manager,
- reply_waiter)
- super(DealerCallPublisherProxy, self).__init__(
- conf, matchmaker, sockets_manager, sender, reply_waiter)
-
-
-class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
- def __init__(self, conf, matchmaker, sockets_manager, reply_waiter):
- super(CallSenderProxy, self).__init__(
- sockets_manager, reply_waiter)
- self.socket = self.outbound_sockets.get_socket_to_publishers()
- self.reply_waiter.poll_socket(self.socket)
- self.routing_table = RoutingTable(conf, matchmaker)
- self.connection_updater = PublisherConnectionUpdater(
- conf, matchmaker, self.socket)
+class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
+ """DEALER-publisher via proxy."""
- def _connect_socket(self, target):
+ def __init__(self, conf, matchmaker):
+ sender = zmq_senders.RequestSenderProxy(conf)
+ if conf.oslo_messaging_zmq.rpc_use_acks:
+ receiver = zmq_receivers.AckAndReplyReceiverProxy(conf)
+ else:
+ receiver = zmq_receivers.ReplyReceiverProxy(conf)
+ super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender,
+ receiver)
+ self.socket = self.sockets_manager.get_socket_to_publishers()
+ self.routing_table = zmq_routing_table.RoutingTable(self.conf,
+ self.matchmaker)
+ self.connection_updater = \
+ PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
+
+ def connect_socket(self, request):
return self.socket
- def _do_send_request(self, socket, request):
- routing_key = self.routing_table.get_routable_host(request.target)
-
- # DEALER socket specific envelope empty delimiter
- socket.send(b'', zmq.SNDMORE)
- socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
- socket.send(six.b(routing_key), zmq.SNDMORE)
- socket.send(six.b(request.message_id), zmq.SNDMORE)
- socket.send_pyobj(request.context, zmq.SNDMORE)
- socket.send_pyobj(request.message)
-
- LOG.debug("Sent message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
-
-class ReplyWaiterProxy(zmq_reply_waiter.ReplyWaiter):
+ def send_call(self, request):
+ try:
+ request.routing_key = \
+ self.routing_table.get_routable_host(request.target)
+ except retrying.RetryError:
+ self._raise_timeout(request)
+ return super(DealerPublisherProxy, self).send_call(request)
+
+ def _get_routing_keys(self, request):
+ try:
+ if request.msg_type in zmq_names.DIRECT_TYPES:
+ return [self.routing_table.get_routable_host(request.target)]
+ else:
+ return \
+ [zmq_address.target_to_subscribe_filter(request.target)] \
+ if self.conf.oslo_messaging_zmq.use_pub_sub else \
+ self.routing_table.get_all_hosts(request.target)
+ except retrying.RetryError:
+ return []
+
+ def _send_non_blocking(self, request):
+ for routing_key in self._get_routing_keys(request):
+ request.routing_key = routing_key
+ self.sender.send(self.socket, request)
- def receive_method(self, socket):
- empty = socket.recv()
- assert empty == b'', "Empty expected!"
- reply_id = socket.recv()
- assert reply_id is not None, "Reply ID expected!"
- message_type = int(socket.recv())
- assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!"
- message_id = socket.recv()
- reply = socket.recv_pyobj()
- LOG.debug("Received reply %s", message_id)
- return reply
-
-
-class RoutingTable(object):
- """This class implements local routing-table cache
- taken from matchmaker. Its purpose is to give the next routable
- host id (remote DEALER's id) by request for specific target in
- round-robin fashion.
- """
-
- def __init__(self, conf, matchmaker):
- self.conf = conf
- self.matchmaker = matchmaker
- self.routing_table = {}
- self.routable_hosts = {}
-
- def get_all_hosts(self, target):
- self._update_routing_table(target)
- return list(self.routable_hosts.get(str(target)) or [])
-
- def get_routable_host(self, target):
- self._update_routing_table(target)
- hosts_for_target = self.routable_hosts[str(target)]
- host = hosts_for_target.pop(0)
- if not hosts_for_target:
- self._renew_routable_hosts(target)
- return host
-
- def _is_tm_expired(self, tm):
- return 0 <= self.conf.zmq_target_expire <= time.time() - tm
-
- def _update_routing_table(self, target):
- routing_record = self.routing_table.get(str(target))
- if routing_record is None:
- self._fetch_hosts(target)
- self._renew_routable_hosts(target)
- elif self._is_tm_expired(routing_record[1]):
- self._fetch_hosts(target)
-
- def _fetch_hosts(self, target):
- self.routing_table[str(target)] = (self.matchmaker.get_hosts(
- target, zmq_names.socket_type_str(zmq.DEALER)), time.time())
-
- def _renew_routable_hosts(self, target):
- hosts, _ = self.routing_table[str(target)]
- self.routable_hosts[str(target)] = list(hosts)
+ def cleanup(self):
+ super(DealerPublisherProxy, self).cleanup()
+ self.connection_updater.stop()
+ self.socket.close()
class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py
deleted file mode 100644
index 027bc7b..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py
+++ /dev/null
@@ -1,68 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import threading
-
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LW
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class ReplyWaiter(object):
-
- def __init__(self, conf):
- self.conf = conf
- self.replies = {}
- self.poller = zmq_async.get_poller()
- self.executor = zmq_async.get_executor(self.run_loop)
- self.executor.execute()
- self._lock = threading.Lock()
-
- def track_reply(self, reply_future, message_id):
- with self._lock:
- self.replies[message_id] = reply_future
-
- def untrack_id(self, message_id):
- with self._lock:
- self.replies.pop(message_id)
-
- def poll_socket(self, socket):
- self.poller.register(socket, recv_method=self.receive_method)
-
- def receive_method(self, socket):
- empty = socket.recv()
- assert empty == b'', "Empty expected!"
- envelope = socket.recv_pyobj()
- assert envelope is not None, "Invalid envelope!"
- reply = socket.recv_pyobj()
- LOG.debug("Received reply %s", envelope)
- return reply
-
- def run_loop(self):
- reply, socket = self.poller.poll(
- timeout=self.conf.rpc_poll_timeout)
- if reply is not None:
- call_future = self.replies.get(reply.message_id)
- if call_future:
- call_future.set_result(reply)
- else:
- LOG.warning(_LW("Received timed out reply: %s"),
- reply.message_id)
-
- def cleanup(self):
- self.poller.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
index bfaff0d..8c6c100 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
@@ -14,14 +14,11 @@
import abc
import logging
-import time
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@@ -56,149 +53,48 @@ class PublisherBase(object):
Publisher can send request objects from zmq_request.
"""
- def __init__(self, sockets_manager):
+ def __init__(self, sockets_manager, sender, receiver):
"""Construct publisher
- Accept configuration object and Name Service interface object.
- Create zmq.Context and connected sockets dictionary.
+ Accept sockets manager, sender and receiver objects.
- :param conf: configuration object
- :type conf: oslo_config.CONF
+ :param sockets_manager: sockets manager object
+ :type sockets_manager: zmq_sockets_manager.SocketsManager
+ :param senders: request sender object
+ :type senders: zmq_senders.RequestSender
+ :param receiver: reply receiver object
+ :type receiver: zmq_receivers.ReplyReceiver
"""
- self.outbound_sockets = sockets_manager
+ self.sockets_manager = sockets_manager
self.conf = sockets_manager.conf
self.matchmaker = sockets_manager.matchmaker
- super(PublisherBase, self).__init__()
+ self.sender = sender
+ self.receiver = receiver
@abc.abstractmethod
- def send_request(self, request):
- """Send request to consumer
-
- :param request: Message data and destination container object
- :type request: zmq_request.Request
+ def connect_socket(self, request):
+ """Get connected socket ready for sending given request
+ or None otherwise (i.e. if connection can't be established).
"""
- def _send_request(self, socket, request):
- """Send request to consumer.
- Helper private method which defines basic sending behavior.
-
- :param socket: Socket to publish message on
- :type socket: zmq.Socket
- :param request: Message data and destination container object
- :type request: zmq_request.Request
- """
- LOG.debug("Sending %(type)s message_id %(message)s to a target "
- "%(target)s",
- {"type": request.msg_type,
- "message": request.message_id,
- "target": request.target})
- socket.send_pyobj(request)
-
- def cleanup(self):
- """Cleanup publisher. Close allocated connections."""
- self.outbound_sockets.cleanup()
-
-
-class SocketsManager(object):
-
- def __init__(self, conf, matchmaker, listener_type, socket_type):
- self.conf = conf
- self.matchmaker = matchmaker
- self.listener_type = listener_type
- self.socket_type = socket_type
- self.zmq_context = zmq.Context()
- self.outbound_sockets = {}
- self.socket_to_publishers = None
- self.socket_to_routers = None
-
- def get_hosts(self, target):
- return self.matchmaker.get_hosts(
- target, zmq_names.socket_type_str(self.listener_type))
-
- @staticmethod
- def _key_from_target(target):
- return target.topic if target.fanout else str(target)
-
- def _get_hosts_and_connect(self, socket, target):
- hosts = self.get_hosts(target)
- self._connect_to_hosts(socket, target, hosts)
-
- def _track_socket(self, socket, target):
- key = self._key_from_target(target)
- self.outbound_sockets[key] = (socket, time.time())
-
- def _connect_to_hosts(self, socket, target, hosts):
- for host in hosts:
- socket.connect_to_host(host)
- self._track_socket(socket, target)
-
- def _check_for_new_hosts(self, target):
- key = self._key_from_target(target)
- socket, tm = self.outbound_sockets[key]
- if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
- self._get_hosts_and_connect(socket, target)
- return socket
-
- def get_socket(self, target):
- key = self._key_from_target(target)
- if key in self.outbound_sockets:
- socket = self._check_for_new_hosts(target)
- else:
- socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
- self.socket_type)
- self._get_hosts_and_connect(socket, target)
- return socket
-
- def get_socket_to_publishers(self):
- if self.socket_to_publishers is not None:
- return self.socket_to_publishers
- self.socket_to_publishers = zmq_socket.ZmqSocket(
- self.conf, self.zmq_context, self.socket_type)
- publishers = self.matchmaker.get_publishers()
- for pub_address, router_address in publishers:
- self.socket_to_publishers.connect_to_host(router_address)
- return self.socket_to_publishers
-
- def get_socket_to_routers(self):
- if self.socket_to_routers is not None:
- return self.socket_to_routers
- self.socket_to_routers = zmq_socket.ZmqSocket(
- self.conf, self.zmq_context, self.socket_type)
- routers = self.matchmaker.get_routers()
- for router_address in routers:
- self.socket_to_routers.connect_to_host(router_address)
- return self.socket_to_routers
-
- def cleanup(self):
- for socket, tm in self.outbound_sockets.values():
- socket.close()
-
-
-class QueuedSender(PublisherBase):
-
- def __init__(self, sockets_manager, _do_send_request):
- super(QueuedSender, self).__init__(sockets_manager)
- self._do_send_request = _do_send_request
- self.queue, self.empty_except = zmq_async.get_queue()
- self.executor = zmq_async.get_executor(self.run_loop)
- self.executor.execute()
-
- def send_request(self, request):
- self.queue.put(request)
+ @abc.abstractmethod
+ def send_call(self, request):
+ pass
- def _connect_socket(self, target):
- return self.outbound_sockets.get_socket(target)
+ @abc.abstractmethod
+ def send_cast(self, request):
+ pass
- def run_loop(self):
- try:
- request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
- except self.empty_except:
- return
+ @abc.abstractmethod
+ def send_fanout(self, request):
+ pass
- socket = self._connect_socket(request.target)
- self._do_send_request(socket, request)
+ @abc.abstractmethod
+ def send_notify(self, request):
+ pass
def cleanup(self):
- self.executor.stop()
- super(QueuedSender, self).cleanup()
+ """Cleanup publisher. Close allocated connections."""
+ self.receiver.stop()
+ self.sockets_manager.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py
deleted file mode 100644
index 4960979..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from oslo_messaging._drivers.zmq_driver.client.publishers\
- import zmq_publisher_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class PushPublisher(object):
-
- def __init__(self, conf, matchmaker):
- super(PushPublisher, self).__init__()
- sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.PULL, zmq.PUSH)
-
- def _do_send_request(push_socket, request):
- push_socket.send_pyobj(request)
-
- LOG.debug("Sending message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
- self.sender = zmq_publisher_base.QueuedSender(
- sockets_manager, _do_send_request)
-
- def send_request(self, request):
-
- if request.msg_type != zmq_names.CAST_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
-
- self.sender.send_request(request)
-
- def cleanup(self):
- self.sender.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py
new file mode 100644
index 0000000..01bbc35
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py
@@ -0,0 +1,111 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from concurrent import futures
+import logging
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._i18n import _LE, _LW
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class AckManagerBase(object):
+
+ def __init__(self, publisher):
+ self.publisher = publisher
+ self.conf = publisher.conf
+ self.sender = publisher.sender
+ self.receiver = publisher.receiver
+
+ def send_call(self, request):
+ return self.publisher.send_call(request)
+
+ def send_cast(self, request):
+ self.publisher.send_cast(request)
+
+ def send_fanout(self, request):
+ self.publisher.send_fanout(request)
+
+ def send_notify(self, request):
+ self.publisher.send_notify(request)
+
+ def cleanup(self):
+ self.publisher.cleanup()
+
+
+class AckManagerDirect(AckManagerBase):
+ pass
+
+
+class AckManagerProxy(AckManagerBase):
+
+ def __init__(self, publisher):
+ super(AckManagerProxy, self).__init__(publisher)
+ self._pool = zmq_async.get_pool(
+ size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
+ )
+
+ def _wait_for_ack(self, ack_future):
+ request, socket = ack_future.args
+ retries = \
+ request.retry or self.conf.oslo_messaging_zmq.rpc_retry_attempts
+ timeout = self.conf.oslo_messaging_zmq.rpc_ack_timeout_base
+
+ done = False
+ while not done:
+ try:
+ reply_id, response = ack_future.result(timeout=timeout)
+ done = True
+ assert response is None, "Ack expected!"
+ assert reply_id == request.routing_key, \
+ "Ack from recipient expected!"
+ except AssertionError:
+ LOG.error(_LE("Message format error in ack for %s"),
+ request.message_id)
+ except futures.TimeoutError:
+ LOG.warning(_LW("No ack received within %(tout)s seconds "
+ "for %(msg_id)s"),
+ {"tout": timeout,
+ "msg_id": request.message_id})
+ if retries is None or retries != 0:
+ if retries is not None and retries > 0:
+ retries -= 1
+ self.sender.send(socket, request)
+ timeout *= \
+ self.conf.oslo_messaging_zmq.rpc_ack_timeout_multiplier
+ else:
+ LOG.warning(_LW("Exhausted number of retries for %s"),
+ request.message_id)
+ done = True
+
+ self.receiver.untrack_request(request)
+
+ def _get_ack_future(self, request):
+ socket = self.publisher.connect_socket(request)
+ self.receiver.register_socket(socket)
+ ack_future = self.receiver.track_request(request)[zmq_names.ACK_TYPE]
+ ack_future.args = request, socket
+ return ack_future
+
+ def send_cast(self, request):
+ self.publisher.send_cast(request)
+ self._pool.submit(self._wait_for_ack, self._get_ack_future(request))
+
+ def cleanup(self):
+ self._pool.shutdown(wait=True)
+ super(AckManagerProxy, self).cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
index e5951cb..24f08f0 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
@@ -15,13 +15,10 @@
from oslo_messaging._drivers import common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_dealer_call_publisher
-from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_dealer_publisher
+ import zmq_dealer_publisher_direct
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_ack_manager
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -43,28 +40,28 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if conf.use_router_proxy or not conf.use_pub_sub:
+ if conf.oslo_messaging_zmq.use_router_proxy or not \
+ conf.oslo_messaging_zmq.use_pub_sub:
raise WrongClientException()
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
+ publisher_direct = self.create_publisher(
+ conf, matchmaker,
+ zmq_dealer_publisher_direct.DealerPublisherDirect,
+ zmq_ack_manager.AckManagerDirect
+ )
- fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
- conf, matchmaker, self.sockets_manager.get_socket_to_publishers())
+ publisher_proxy = self.create_publisher(
+ conf, matchmaker,
+ zmq_dealer_publisher_proxy.DealerPublisherProxy,
+ zmq_ack_manager.AckManagerProxy
+ )
super(ZmqClientMixDirectPubSub, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
- zmq_names.CALL_TYPE:
- zmq_dealer_call_publisher.DealerCallPublisher(
- conf, matchmaker, self.sockets_manager),
-
- zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
-
- zmq_names.NOTIFY_TYPE: fanout_publisher,
-
- "default": zmq_dealer_publisher.DealerPublisherAsync(
- conf, matchmaker)
+ zmq_names.CAST_FANOUT_TYPE: publisher_proxy,
+ zmq_names.NOTIFY_TYPE: publisher_proxy,
+ "default": publisher_direct
}
)
@@ -79,22 +76,19 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if conf.use_pub_sub or conf.use_router_proxy:
+ if conf.oslo_messaging_zmq.use_pub_sub or \
+ conf.oslo_messaging_zmq.use_router_proxy:
raise WrongClientException()
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
+ publisher = self.create_publisher(
+ conf, matchmaker,
+ zmq_dealer_publisher_direct.DealerPublisherDirect,
+ zmq_ack_manager.AckManagerDirect
+ )
super(ZmqClientDirect, self).__init__(
conf, matchmaker, allowed_remote_exmods,
- publishers={
- zmq_names.CALL_TYPE:
- zmq_dealer_call_publisher.DealerCallPublisher(
- conf, matchmaker, self.sockets_manager),
-
- "default": zmq_dealer_publisher.DealerPublisher(
- conf, matchmaker)
- }
+ publishers={"default": publisher}
)
@@ -110,21 +104,16 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if not conf.use_router_proxy:
+ if not conf.oslo_messaging_zmq.use_router_proxy:
raise WrongClientException()
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
+ publisher = self.create_publisher(
+ conf, matchmaker,
+ zmq_dealer_publisher_proxy.DealerPublisherProxy,
+ zmq_ack_manager.AckManagerProxy
+ )
super(ZmqClientProxy, self).__init__(
conf, matchmaker, allowed_remote_exmods,
- publishers={
- zmq_names.CALL_TYPE:
- zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
- conf, matchmaker, self.sockets_manager),
-
- "default": zmq_dealer_publisher_proxy.DealerPublisherProxy(
- conf, matchmaker,
- self.sockets_manager.get_socket_to_publishers())
- }
+ publishers={"default": publisher}
)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
index 7630cc7..5d6ee5a 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
@@ -24,45 +24,51 @@ class ZmqClientBase(object):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None,
publishers=None):
self.conf = conf
- self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
self.publishers = publishers
- self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \
- or publishers["default"]
- self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \
- or publishers["default"]
- self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \
- or publishers["default"]
- self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \
- or publishers["default"]
+ self.call_publisher = publishers.get(zmq_names.CALL_TYPE,
+ publishers["default"])
+ self.cast_publisher = publishers.get(zmq_names.CAST_TYPE,
+ publishers["default"])
+ self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE,
+ publishers["default"])
+ self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE,
+ publishers["default"])
+
+ @staticmethod
+ def create_publisher(conf, matchmaker, publisher_cls, ack_manager_cls):
+ publisher = publisher_cls(conf, matchmaker)
+ if conf.oslo_messaging_zmq.rpc_use_acks:
+ publisher = ack_manager_cls(publisher)
+ return publisher
def send_call(self, target, context, message, timeout=None, retry=None):
request = zmq_request.CallRequest(
target, context=context, message=message, retry=retry,
timeout=timeout, allowed_remote_exmods=self.allowed_remote_exmods
)
- return self.call_publisher.send_request(request)
+ return self.call_publisher.send_call(request)
def send_cast(self, target, context, message, retry=None):
request = zmq_request.CastRequest(
target, context=context, message=message, retry=retry
)
- self.cast_publisher.send_request(request)
+ self.cast_publisher.send_cast(request)
def send_fanout(self, target, context, message, retry=None):
request = zmq_request.FanoutRequest(
target, context=context, message=message, retry=retry
)
- self.fanout_publisher.send_request(request)
+ self.fanout_publisher.send_fanout(request)
def send_notify(self, target, context, message, version, retry=None):
request = zmq_request.NotificationRequest(
target, context=context, message=message, retry=retry,
version=version
)
- self.notify_publisher.send_request(request)
+ self.notify_publisher.send_notify(request)
def cleanup(self):
cleaned = set()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py
deleted file mode 100644
index d1913b4..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_names
-
-
-class Envelope(object):
-
- def __init__(self, msg_type=None, message_id=None, target=None,
- routing_key=None, **kwargs):
- self._msg_type = msg_type
- self._message_id = message_id
- self._target = target
- self._reply_id = None
- self._routing_key = routing_key
- self._kwargs = kwargs
-
- @property
- def reply_id(self):
- return self._reply_id
-
- @reply_id.setter
- def reply_id(self, value):
- self._reply_id = value
-
- @property
- def routing_key(self):
- return self._routing_key
-
- @routing_key.setter
- def routing_key(self, value):
- self._routing_key = value
-
- @property
- def msg_type(self):
- return self._msg_type
-
- @msg_type.setter
- def msg_type(self, value):
- self._msg_type = value
-
- @property
- def message_id(self):
- return self._message_id
-
- @property
- def target(self):
- return self._target
-
- @property
- def is_mult_send(self):
- return self._msg_type in zmq_names.MULTISEND_TYPES
-
- @property
- def topic_filter(self):
- return zmq_address.target_to_subscribe_filter(self._target)
-
- def has(self, key):
- return key in self._kwargs
-
- def set(self, key, value):
- self._kwargs[key] = value
-
- def get(self, key):
- self._kwargs.get(key)
-
- def to_dict(self):
- envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type,
- zmq_names.FIELD_MSG_ID: self._message_id,
- zmq_names.FIELD_TARGET: self._target,
- zmq_names.FIELD_ROUTING_KEY: self._routing_key}
- envelope.update({k: v for k, v in self._kwargs.items()
- if v is not None})
- return envelope
-
- def __str__(self):
- return str(self.to_dict())
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
new file mode 100644
index 0000000..9f7aeec
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
@@ -0,0 +1,180 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+import threading
+
+import futurist
+import six
+
+from oslo_messaging._drivers.zmq_driver.client import zmq_response
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class ReceiverBase(object):
+ """Base response receiving interface."""
+
+ def __init__(self, conf):
+ self.conf = conf
+ self._lock = threading.Lock()
+ self._requests = {}
+ self._poller = zmq_async.get_poller()
+ self._executor = zmq_async.get_executor(method=self._run_loop)
+ self._executor.execute()
+
+ @abc.abstractproperty
+ def message_types(self):
+ """A set of supported incoming response types."""
+
+ def register_socket(self, socket):
+ """Register a socket for receiving data."""
+ self._poller.register(socket, recv_method=self.recv_response)
+
+ def unregister_socket(self, socket):
+ """Unregister a socket from receiving data."""
+ self._poller.unregister(socket)
+
+ @abc.abstractmethod
+ def recv_response(self, socket):
+ """Receive a response and return a tuple of the form
+ (reply_id, message_type, message_id, response).
+ """
+
+ def track_request(self, request):
+ """Track a request via already registered sockets and return
+ a dict of futures for monitoring all types of responses.
+ """
+ futures = {}
+ for message_type in self.message_types:
+ future = futurist.Future()
+ self._set_future(request.message_id, message_type, future)
+ futures[message_type] = future
+ return futures
+
+ def untrack_request(self, request):
+ """Untrack a request and stop monitoring any responses."""
+ for message_type in self.message_types:
+ self._pop_future(request.message_id, message_type)
+
+ def stop(self):
+ self._poller.close()
+ self._executor.stop()
+
+ def _get_future(self, message_id, message_type):
+ with self._lock:
+ return self._requests.get((message_id, message_type))
+
+ def _set_future(self, message_id, message_type, future):
+ with self._lock:
+ self._requests[(message_id, message_type)] = future
+
+ def _pop_future(self, message_id, message_type):
+ with self._lock:
+ return self._requests.pop((message_id, message_type), None)
+
+ def _run_loop(self):
+ data, socket = self._poller.poll(
+ timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout)
+ if data is None:
+ return
+ reply_id, message_type, message_id, response = data
+ assert message_type in self.message_types, \
+ "%s is not supported!" % zmq_names.message_type_str(message_type)
+ future = self._get_future(message_id, message_type)
+ if future is not None:
+ LOG.debug("Received %(msg_type)s for %(msg_id)s",
+ {"msg_type": zmq_names.message_type_str(message_type),
+ "msg_id": message_id})
+ future.set_result((reply_id, response))
+
+
+class ReplyReceiver(ReceiverBase):
+
+ message_types = {zmq_names.REPLY_TYPE}
+
+
+class ReplyReceiverProxy(ReplyReceiver):
+
+ def recv_response(self, socket):
+ empty = socket.recv()
+ assert empty == b'', "Empty expected!"
+ reply_id = socket.recv()
+ assert reply_id is not None, "Reply ID expected!"
+ message_type = int(socket.recv())
+ assert message_type == zmq_names.REPLY_TYPE, "Reply expected!"
+ message_id = socket.recv_string()
+ reply_body, failure = socket.recv_loaded()
+ reply = zmq_response.Reply(
+ message_id=message_id, reply_id=reply_id,
+ reply_body=reply_body, failure=failure
+ )
+ return reply_id, message_type, message_id, reply
+
+
+class ReplyReceiverDirect(ReplyReceiver):
+
+ def recv_response(self, socket):
+ empty = socket.recv()
+ assert empty == b'', "Empty expected!"
+ raw_reply = socket.recv_loaded()
+ assert isinstance(raw_reply, dict), "Dict expected!"
+ reply = zmq_response.Reply(**raw_reply)
+ return reply.reply_id, reply.msg_type, reply.message_id, reply
+
+
+class AckAndReplyReceiver(ReceiverBase):
+
+ message_types = {zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE}
+
+
+class AckAndReplyReceiverProxy(AckAndReplyReceiver):
+
+ def recv_response(self, socket):
+ empty = socket.recv()
+ assert empty == b'', "Empty expected!"
+ reply_id = socket.recv()
+ assert reply_id is not None, "Reply ID expected!"
+ message_type = int(socket.recv())
+ assert message_type in (zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE), \
+ "Ack or reply expected!"
+ message_id = socket.recv_string()
+ if message_type == zmq_names.REPLY_TYPE:
+ reply_body, failure = socket.recv_loaded()
+ reply = zmq_response.Reply(
+ message_id=message_id, reply_id=reply_id,
+ reply_body=reply_body, failure=failure
+ )
+ response = reply
+ else:
+ response = None
+ return reply_id, message_type, message_id, response
+
+
+class AckAndReplyReceiverDirect(AckAndReplyReceiver):
+
+ def recv_response(self, socket):
+ # acks are not supported yet
+ empty = socket.recv()
+ assert empty == b'', "Empty expected!"
+ raw_reply = socket.recv_loaded()
+ assert isinstance(raw_reply, dict), "Dict expected!"
+ reply = zmq_response.Reply(**raw_reply)
+ return reply.reply_id, reply.msg_type, reply.message_id, reply
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
index a9ba36e..b3f8aae 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
@@ -18,7 +18,6 @@ import uuid
import six
-from oslo_messaging._drivers.zmq_driver.client import zmq_envelope
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE
@@ -70,14 +69,6 @@ class Request(object):
self.message_id = str(uuid.uuid1())
- def create_envelope(self, routing_key=None, reply_id=None):
- envelope = zmq_envelope.Envelope(msg_type=self.msg_type,
- message_id=self.message_id,
- target=self.target,
- routing_key=routing_key)
- envelope.reply_id = reply_id
- return envelope
-
@abc.abstractproperty
def msg_type(self):
"""ZMQ message type"""
@@ -112,12 +103,6 @@ class CallRequest(RpcRequest):
super(CallRequest, self).__init__(*args, **kwargs)
- def create_envelope(self, routing_key=None, reply_id=None):
- envelope = super(CallRequest, self).create_envelope(
- routing_key, reply_id)
- envelope.set('timeout', self.timeout)
- return envelope
-
class CastRequest(RpcRequest):
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py
index b6a7b75..ab452de 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py
@@ -12,28 +12,24 @@
# License for the specific language governing permissions and limitations
# under the License.
+import abc
+
+import six
+
from oslo_messaging._drivers.zmq_driver import zmq_names
+@six.add_metaclass(abc.ABCMeta)
class Response(object):
- def __init__(self, id=None, type=None, message_id=None,
- reply_id=None, reply_body=None, failure=None):
+ def __init__(self, message_id=None, reply_id=None):
- self._id = id
- self._type = type
self._message_id = message_id
self._reply_id = reply_id
- self._reply_body = reply_body
- self._failure = failure
-
- @property
- def id_(self):
- return self._id
- @property
- def type_(self):
- return self._type
+ @abc.abstractproperty
+ def msg_type(self):
+ pass
@property
def message_id(self):
@@ -43,6 +39,29 @@ class Response(object):
def reply_id(self):
return self._reply_id
+ def to_dict(self):
+ return {zmq_names.FIELD_MSG_ID: self._message_id,
+ zmq_names.FIELD_REPLY_ID: self._reply_id}
+
+ def __str__(self):
+ return str(self.to_dict())
+
+
+class Ack(Response):
+
+ msg_type = zmq_names.ACK_TYPE
+
+
+class Reply(Response):
+
+ msg_type = zmq_names.REPLY_TYPE
+
+ def __init__(self, message_id=None, reply_id=None, reply_body=None,
+ failure=None):
+ super(Reply, self).__init__(message_id, reply_id)
+ self._reply_body = reply_body
+ self._failure = failure
+
@property
def reply_body(self):
return self._reply_body
@@ -52,12 +71,7 @@ class Response(object):
return self._failure
def to_dict(self):
- return {zmq_names.FIELD_ID: self._id,
- zmq_names.FIELD_TYPE: self._type,
- zmq_names.FIELD_MSG_ID: self._message_id,
- zmq_names.FIELD_REPLY_ID: self._reply_id,
- zmq_names.FIELD_REPLY: self._reply_body,
- zmq_names.FIELD_FAILURE: self._failure}
-
- def __str__(self):
- return str(self.to_dict())
+ dict_ = super(Reply, self).to_dict()
+ dict_.update({zmq_names.FIELD_REPLY_BODY: self._reply_body,
+ zmq_names.FIELD_FAILURE: self._failure})
+ return dict_
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
new file mode 100644
index 0000000..d0b6227
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
@@ -0,0 +1,66 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+zmq = zmq_async.import_zmq()
+
+
+class RoutingTable(object):
+ """This class implements local routing-table cache
+ taken from matchmaker. Its purpose is to give the next routable
+ host id (remote DEALER's id) by request for specific target in
+ round-robin fashion.
+ """
+
+ def __init__(self, conf, matchmaker):
+ self.conf = conf
+ self.matchmaker = matchmaker
+ self.routing_table = {}
+ self.routable_hosts = {}
+
+ def get_all_hosts(self, target):
+ self._update_routing_table(target)
+ return list(self.routable_hosts.get(str(target), []))
+
+ def get_routable_host(self, target):
+ self._update_routing_table(target)
+ hosts_for_target = self.routable_hosts[str(target)]
+ host = hosts_for_target.pop()
+ if not hosts_for_target:
+ self._renew_routable_hosts(target)
+ return host
+
+ def _is_tm_expired(self, tm):
+ return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
+ <= time.time() - tm
+
+ def _update_routing_table(self, target):
+ routing_record = self.routing_table.get(str(target))
+ if routing_record is None:
+ self._fetch_hosts(target)
+ self._renew_routable_hosts(target)
+ elif self._is_tm_expired(routing_record[1]):
+ self._fetch_hosts(target)
+
+ def _fetch_hosts(self, target):
+ self.routing_table[str(target)] = (self.matchmaker.get_hosts(
+ target, zmq_names.socket_type_str(zmq.DEALER)), time.time())
+
+ def _renew_routable_hosts(self, target):
+ hosts, _ = self.routing_table[str(target)]
+ self.routable_hosts[str(target)] = list(hosts)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
new file mode 100644
index 0000000..dd992e0
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
@@ -0,0 +1,140 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class SenderBase(object):
+ """Base request/ack/reply sending interface."""
+
+ def __init__(self, conf):
+ self.conf = conf
+
+ @abc.abstractmethod
+ def send(self, socket, message):
+ pass
+
+
+class RequestSender(SenderBase):
+ pass
+
+
+class AckSender(SenderBase):
+ pass
+
+
+class ReplySender(SenderBase):
+ pass
+
+
+class RequestSenderProxy(SenderBase):
+
+ def send(self, socket, request):
+ socket.send(b'', zmq.SNDMORE)
+ socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
+ socket.send(request.routing_key, zmq.SNDMORE)
+ socket.send_string(request.message_id, zmq.SNDMORE)
+ socket.send_dumped([request.context, request.message])
+
+ LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message "
+ "%(msg_id)s to target %(target)s",
+ {"addr": list(socket.connections),
+ "msg_type": zmq_names.message_type_str(request.msg_type),
+ "msg_id": request.message_id,
+ "target": request.target})
+
+
+class AckSenderProxy(AckSender):
+
+ def send(self, socket, ack):
+ assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!"
+
+ socket.send(b'', zmq.SNDMORE)
+ socket.send(six.b(str(ack.msg_type)), zmq.SNDMORE)
+ socket.send(ack.reply_id, zmq.SNDMORE)
+ socket.send_string(ack.message_id)
+
+ LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s for %(msg_id)s",
+ {"addr": list(socket.connections),
+ "msg_type": zmq_names.message_type_str(ack.msg_type),
+ "msg_id": ack.message_id})
+
+
+class ReplySenderProxy(SenderBase):
+
+ def send(self, socket, reply):
+ assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
+
+ socket.send(b'', zmq.SNDMORE)
+ socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE)
+ socket.send(reply.reply_id, zmq.SNDMORE)
+ socket.send_string(reply.message_id, zmq.SNDMORE)
+ socket.send_dumped([reply.reply_body, reply.failure])
+
+ LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s for %(msg_id)s",
+ {"addr": list(socket.connections),
+ "msg_type": zmq_names.message_type_str(reply.msg_type),
+ "msg_id": reply.message_id})
+
+
+class RequestSenderDirect(SenderBase):
+
+ def send(self, socket, request):
+ socket.send(b'', zmq.SNDMORE)
+ socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
+ socket.send_string(request.message_id, zmq.SNDMORE)
+ socket.send_dumped([request.context, request.message])
+
+ LOG.debug("Sending %(msg_type)s message %(msg_id)s to "
+ "target %(target)s",
+ {"msg_type": zmq_names.message_type_str(request.msg_type),
+ "msg_id": request.message_id,
+ "target": request.target})
+
+
+class AckSenderDirect(AckSender):
+
+ def send(self, socket, ack):
+ assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!"
+
+ # not implemented yet
+
+ LOG.debug("Sending %(msg_type)s for %(msg_id)s",
+ {"msg_type": zmq_names.message_type_str(ack.msg_type),
+ "msg_id": ack.message_id})
+
+
+class ReplySenderDirect(SenderBase):
+
+ def send(self, socket, reply):
+ assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
+
+ socket.send(reply.reply_id, zmq.SNDMORE)
+ socket.send(b'', zmq.SNDMORE)
+ socket.send_dumped(reply.to_dict())
+
+ LOG.debug("Sending %(msg_type)s for %(msg_id)s",
+ {"msg_type": zmq_names.message_type_str(reply.msg_type),
+ "msg_id": reply.message_id})
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
new file mode 100644
index 0000000..aa82b84
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
@@ -0,0 +1,97 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._drivers.zmq_driver import zmq_socket
+
+zmq = zmq_async.import_zmq()
+
+
+class SocketsManager(object):
+
+ def __init__(self, conf, matchmaker, listener_type, socket_type):
+ self.conf = conf
+ self.matchmaker = matchmaker
+ self.listener_type = listener_type
+ self.socket_type = socket_type
+ self.zmq_context = zmq.Context()
+ self.outbound_sockets = {}
+ self.socket_to_publishers = None
+ self.socket_to_routers = None
+
+ def get_hosts(self, target):
+ return self.matchmaker.get_hosts(
+ target, zmq_names.socket_type_str(self.listener_type))
+
+ @staticmethod
+ def _key_from_target(target):
+ return target.topic if target.fanout else str(target)
+
+ def _get_hosts_and_connect(self, socket, target):
+ hosts = self.get_hosts(target)
+ self._connect_to_hosts(socket, target, hosts)
+
+ def _track_socket(self, socket, target):
+ key = self._key_from_target(target)
+ self.outbound_sockets[key] = (socket, time.time())
+
+ def _connect_to_hosts(self, socket, target, hosts):
+ for host in hosts:
+ socket.connect_to_host(host)
+ self._track_socket(socket, target)
+
+ def _check_for_new_hosts(self, target):
+ key = self._key_from_target(target)
+ socket, tm = self.outbound_sockets[key]
+ if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
+ <= time.time() - tm:
+ self._get_hosts_and_connect(socket, target)
+ return socket
+
+ def get_socket(self, target):
+ key = self._key_from_target(target)
+ if key in self.outbound_sockets:
+ socket = self._check_for_new_hosts(target)
+ else:
+ socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
+ self.socket_type, immediate=False)
+ self._get_hosts_and_connect(socket, target)
+ return socket
+
+ def get_socket_to_publishers(self):
+ if self.socket_to_publishers is not None:
+ return self.socket_to_publishers
+ self.socket_to_publishers = zmq_socket.ZmqSocket(
+ self.conf, self.zmq_context, self.socket_type)
+ publishers = self.matchmaker.get_publishers()
+ for pub_address, router_address in publishers:
+ self.socket_to_publishers.connect_to_host(router_address)
+ return self.socket_to_publishers
+
+ def get_socket_to_routers(self):
+ if self.socket_to_routers is not None:
+ return self.socket_to_routers
+ self.socket_to_routers = zmq_socket.ZmqSocket(
+ self.conf, self.zmq_context, self.socket_type)
+ routers = self.matchmaker.get_routers()
+ for router_address in routers:
+ self.socket_to_routers.connect_to_host(router_address)
+ return self.socket_to_routers
+
+ def cleanup(self):
+ for socket, tm in self.outbound_sockets.values():
+ socket.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py
index 65ade7a..6ad07e5 100644
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py
+++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py
@@ -28,7 +28,7 @@ class MatchMakerBase(object):
self.url = kwargs.get('url')
@abc.abstractmethod
- def register_publisher(self, hostname):
+ def register_publisher(self, hostname, expire=-1):
"""Register publisher on nameserver.
This works for PUB-SUB only
@@ -36,6 +36,8 @@ class MatchMakerBase(object):
:param hostname: host for the topic in "host:port" format
host for back-chatter in "host:port" format
:type hostname: tuple
+ :param expire: record expiration timeout
+ :type expire: int
"""
@abc.abstractmethod
@@ -57,13 +59,15 @@ class MatchMakerBase(object):
"""
@abc.abstractmethod
- def register_router(self, hostname):
+ def register_router(self, hostname, expire=-1):
"""Register router on the nameserver.
This works for ROUTER proxy only
:param hostname: host for the topic in "host:port" format
- :type hostname: string
+ :type hostname: str
+ :param expire: record expiration timeout
+ :type expire: int
"""
@abc.abstractmethod
@@ -73,7 +77,7 @@ class MatchMakerBase(object):
This works for ROUTER proxy only
:param hostname: host for the topic in "host:port" format
- :type hostname: string
+ :type hostname: str
"""
@abc.abstractmethod
@@ -92,10 +96,10 @@ class MatchMakerBase(object):
:param target: the target for host
:type target: Target
:param hostname: host for the topic in "host:port" format
- :type hostname: String
- :param listener_type: Listener socket type ROUTER, SUB etc.
- :type listener_type: String
- :param expire: Record expiration timeout
+ :type hostname: str
+ :param listener_type: listener socket type ROUTER, SUB etc.
+ :type listener_type: str
+ :param expire: record expiration timeout
:type expire: int
"""
@@ -106,9 +110,9 @@ class MatchMakerBase(object):
:param target: the target for host
:type target: Target
:param hostname: host for the topic in "host:port" format
- :type hostname: String
- :param listener_type: Listener socket type ROUTER, SUB etc.
- :type listener_type: String
+ :type hostname: str
+ :param listener_type: listener socket type ROUTER, SUB etc.
+ :type listener_type: str
"""
@abc.abstractmethod
@@ -117,6 +121,8 @@ class MatchMakerBase(object):
:param target: the default target for invocations
:type target: Target
+ :param listener_type: listener socket type ROUTER, SUB etc.
+ :type listener_type: str
:returns: a list of "hostname:port" hosts
"""
@@ -130,7 +136,7 @@ class DummyMatchMaker(MatchMakerBase):
self._publishers = set()
self._routers = set()
- def register_publisher(self, hostname):
+ def register_publisher(self, hostname, expire=-1):
if hostname not in self._publishers:
self._publishers.add(hostname)
@@ -141,7 +147,7 @@ class DummyMatchMaker(MatchMakerBase):
def get_publishers(self):
return list(self._publishers)
- def register_router(self, hostname):
+ def register_router(self, hostname, expire=-1):
if hostname not in self._routers:
self._routers.add(hostname)
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
index daa1b17..440c00b 100644
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
+++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py
@@ -171,7 +171,6 @@ class RedisMatchMaker(base.MatchMakerBase):
return self._redis.smembers(key)
def register(self, target, hostname, listener_type, expire=-1):
-
if target.topic and target.server:
key = zmq_address.target_to_key(target, listener_type)
self._add_key_with_expire(key, hostname, expire)
@@ -191,11 +190,14 @@ class RedisMatchMaker(base.MatchMakerBase):
def get_hosts(self, target, listener_type):
LOG.debug("[Redis] get_hosts for target %s", target)
+
hosts = []
- key = zmq_address.target_to_key(target, listener_type)
- hosts.extend(self._get_hosts_by_key(key))
- if (not hosts or target.fanout) and target.topic and target.server:
+ if target.topic and target.server:
+ key = zmq_address.target_to_key(target, listener_type)
+ hosts.extend(self._get_hosts_by_key(key))
+
+ if not hosts and target.topic:
key = zmq_address.prefix_str(target.topic, listener_type)
hosts.extend(self._get_hosts_by_key(key))
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
index 591b2ac..ab8f313 100644
--- a/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
+++ b/oslo_messaging/_drivers/zmq_driver/poller/green_poller.py
@@ -31,6 +31,11 @@ class GreenPoller(zmq_poller.ZmqPoller):
self.thread_by_socket[socket] = self.green_pool.spawn(
self._socket_receive, socket, recv_method)
+ def unregister(self, socket):
+ thread = self.thread_by_socket.pop(socket, None)
+ if thread:
+ thread.kill()
+
def _socket_receive(self, socket, recv_method=None):
while True:
if recv_method:
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
index edcba7d..9589fd1 100644
--- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
+++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
@@ -37,6 +37,10 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
self.recv_methods[socket] = recv_method
self.poller.register(socket, zmq.POLLIN)
+ def unregister(self, socket):
+ self.recv_methods.pop(socket, None)
+ self.poller.unregister(socket)
+
def poll(self, timeout=None):
if timeout is not None and timeout > 0:
timeout *= 1000 # convert seconds to milliseconds
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py b/oslo_messaging/_drivers/zmq_driver/proxy/__init__.py
index e69de29..e69de29 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/__init__.py
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
index 4ee3688..15c7774 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
@@ -13,9 +13,11 @@
# under the License.
import logging
+import socket
from stevedore import driver
+from oslo_config import cfg
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LI
@@ -23,6 +25,22 @@ zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
+zmq_proxy_opts = [
+ cfg.StrOpt('host', default=socket.gethostname(),
+ help='Hostname (FQDN) of current proxy'
+ ' an ethernet interface, or IP address.'),
+
+ cfg.IntOpt('frontend_port', default=0,
+ help='Front-end ROUTER port number. Zero means random.'),
+
+ cfg.IntOpt('backend_port', default=0,
+ help='Back-end ROUTER port number. Zero means random.'),
+
+ cfg.IntOpt('publisher_port', default=0,
+ help='Publisher port number. Zero means random.'),
+]
+
+
class ZmqProxy(object):
"""Wrapper class for Publishers and Routers proxies.
The main reason to have a proxy is high complexity of TCP sockets number
@@ -48,7 +66,7 @@ class ZmqProxy(object):
2. Routers should be transparent for clients and servers. Which means
it doesn't change the way of messaging between client and the final
target by hiding the target from a client.
- 3. Router may be restarted or get down at any time loosing all messages
+ 3. Router may be restarted or shut down at any time losing all messages
in its queue. Smart retrying (based on acknowledgements from server
side) and load balancing between other Router instances from the
client side should handle the situation.
@@ -67,7 +85,7 @@ class ZmqProxy(object):
self.conf = conf
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
- self.conf.rpc_zmq_matchmaker,
+ self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
).driver(self.conf)
self.context = zmq.Context()
self.proxy = proxy_cls(conf, self.context, self.matchmaker)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py
index dbe995b..727b419 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_publisher_proxy.py
@@ -14,7 +14,6 @@
import logging
-from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
@@ -24,7 +23,7 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class PubPublisherProxy(object):
+class PublisherProxy(object):
"""PUB/SUB based request publisher
The publisher intended to be used for Fanout and Notify
@@ -39,24 +38,28 @@ class PubPublisherProxy(object):
"""
def __init__(self, conf, matchmaker):
- super(PubPublisherProxy, self).__init__()
+ super(PublisherProxy, self).__init__()
self.conf = conf
self.zmq_context = zmq.Context()
self.matchmaker = matchmaker
- self.socket = zmq_socket.ZmqRandomPortSocket(
- self.conf, self.zmq_context, zmq.PUB)
+ port = conf.zmq_proxy_opts.publisher_port
- self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
- self.socket.port)
+ self.socket = zmq_socket.ZmqFixedPortSocket(
+ self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host,
+ port) if port != 0 else \
+ zmq_socket.ZmqRandomPortSocket(
+ self.conf, self.zmq_context, zmq.PUB, conf.zmq_proxy_opts.host)
+
+ self.host = self.socket.connect_address
def send_request(self, multipart_message):
message_type = multipart_message.pop(0)
assert message_type in (zmq_names.CAST_FANOUT_TYPE,
zmq_names.NOTIFY_TYPE), "Fanout expected!"
topic_filter = multipart_message.pop(0)
- message_id = multipart_message.pop(0)
reply_id = multipart_message.pop(0)
+ message_id = multipart_message.pop(0)
assert reply_id is not None, "Reply id expected!"
self.socket.send(topic_filter, zmq.SNDMORE)
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
index c75ff4e..4c747ab 100644
--- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
@@ -16,9 +16,7 @@ import logging
import six
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_pub_publisher
-from oslo_messaging._drivers.zmq_driver import zmq_address
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
@@ -38,27 +36,31 @@ class UniversalQueueProxy(object):
self.matchmaker = matchmaker
self.poller = zmq_async.get_poller()
- self.fe_router_socket = zmq_socket.ZmqRandomPortSocket(
- conf, context, zmq.ROUTER)
- self.be_router_socket = zmq_socket.ZmqRandomPortSocket(
- conf, context, zmq.ROUTER)
+ port = conf.zmq_proxy_opts.frontend_port
+ host = conf.zmq_proxy_opts.host
+ self.fe_router_socket = zmq_socket.ZmqFixedPortSocket(
+ conf, context, zmq.ROUTER, host,
+ conf.zmq_proxy_opts.frontend_port) if port != 0 else \
+ zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
+
+ port = conf.zmq_proxy_opts.backend_port
+ self.be_router_socket = zmq_socket.ZmqFixedPortSocket(
+ conf, context, zmq.ROUTER, host,
+ conf.zmq_proxy_opts.backend_port) if port != 0 else \
+ zmq_socket.ZmqRandomPortSocket(conf, context, zmq.ROUTER, host)
self.poller.register(self.fe_router_socket.handle,
self._receive_in_request)
self.poller.register(self.be_router_socket.handle,
self._receive_in_request)
- self.fe_router_address = zmq_address.combine_address(
- self.conf.rpc_zmq_host, self.fe_router_socket.port)
- self.be_router_address = zmq_address.combine_address(
- self.conf.rpc_zmq_host, self.fe_router_socket.port)
-
- self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
+ self.pub_publisher = zmq_publisher_proxy.PublisherProxy(
conf, matchmaker)
self._router_updater = RouterUpdater(
- conf, matchmaker, self.pub_publisher.host, self.fe_router_address,
- self.be_router_address)
+ conf, matchmaker, self.pub_publisher.host,
+ self.fe_router_socket.connect_address,
+ self.be_router_socket.connect_address)
def run(self):
message, socket = self.poller.poll()
@@ -66,13 +68,14 @@ class UniversalQueueProxy(object):
return
msg_type = message[0]
- if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE,
- zmq_names.NOTIFY_TYPE):
+ if self.conf.oslo_messaging_zmq.use_pub_sub and \
+ msg_type in (zmq_names.CAST_FANOUT_TYPE,
+ zmq_names.NOTIFY_TYPE):
self.pub_publisher.send_request(message)
else:
- self._redirect_message(self.be_router_socket
- if socket is self.fe_router_socket
- else self.fe_router_socket, message)
+ self._redirect_message(self.be_router_socket.handle
+ if socket is self.fe_router_socket.handle
+ else self.fe_router_socket.handle, message)
@staticmethod
def _receive_in_request(socket):
@@ -88,7 +91,7 @@ class UniversalQueueProxy(object):
payload.insert(0, routing_key)
payload.insert(0, msg_type)
return payload
- except (AssertionError, zmq.ZMQError):
+ except (AssertionError, ValueError, zmq.ZMQError):
LOG.error("Received message with wrong format")
return None
@@ -102,16 +105,17 @@ class UniversalQueueProxy(object):
socket.send(b'', zmq.SNDMORE)
socket.send(reply_id, zmq.SNDMORE)
socket.send(six.b(str(message_type)), zmq.SNDMORE)
- LOG.debug("Dispatching message %s" % message_id)
+ LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - to %(rkey)s" %
+ {"msg_type": zmq_names.message_type_str(message_type),
+ "msg_id": message_id,
+ "rkey": routing_key})
socket.send_multipart(multipart_message)
def cleanup(self):
self.fe_router_socket.close()
self.be_router_socket.close()
self.pub_publisher.cleanup()
- self.matchmaker.unregister_publisher(
- (self.pub_publisher.host, self.fe_router_address))
- self.matchmaker.unregister_router(self.be_router_address)
+ self._router_updater.cleanup()
class RouterUpdater(zmq_updater.UpdaterBase):
@@ -130,11 +134,19 @@ class RouterUpdater(zmq_updater.UpdaterBase):
def _update_records(self):
self.matchmaker.register_publisher(
(self.publisher_address, self.fe_router_address),
- expire=self.conf.zmq_target_expire)
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
{"pub": self.publisher_address,
"router": self.fe_router_address})
- self.matchmaker.register_router(self.be_router_address,
- expire=self.conf.zmq_target_expire)
+ self.matchmaker.register_router(
+ self.be_router_address,
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
{"router": self.be_router_address})
+
+ def cleanup(self):
+ super(RouterUpdater, self).cleanup()
+ self.matchmaker.unregister_publisher(
+ (self.publisher_address, self.fe_router_address))
+ self.matchmaker.unregister_router(
+ self.be_router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
index 69c6958..69a7077 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
@@ -40,6 +40,10 @@ class ConsumerBase(object):
self.sockets = []
self.context = zmq.Context()
+ def stop(self):
+ """Stop consumer polling/updates"""
+ pass
+
@abc.abstractmethod
def receive_message(self, target):
"""Method for poller - receiving message routine"""
@@ -63,6 +67,9 @@ class SingleSocketConsumer(ConsumerBase):
self.target_updater = TargetUpdater(
conf, self.matchmaker, self.target, self.host, socket_type)
+ def stop(self):
+ self.target_updater.stop()
+
def subscribe_socket(self, socket_type):
try:
socket = zmq_socket.ZmqRandomPortSocket(
@@ -72,8 +79,8 @@ class SingleSocketConsumer(ConsumerBase):
{"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address,
"port": socket.port})
- self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
- socket.port)
+ self.host = zmq_address.combine_address(
+ self.conf.oslo_messaging_zmq.rpc_zmq_host, socket.port)
self.poller.register(socket, self.receive_message)
return socket
except zmq.ZMQError as e:
@@ -112,4 +119,10 @@ class TargetUpdater(zmq_updater.UpdaterBase):
self.matchmaker.register(
self.target, self.host,
zmq_names.socket_type_str(self.socket_type),
- expire=self.conf.zmq_target_expire)
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
+
+ def stop(self):
+ super(TargetUpdater, self).stop()
+ self.matchmaker.unregister(
+ self.target, self.host,
+ zmq_names.socket_type_str(self.socket_type))
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index f0fd111..d2cd18b 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -14,114 +14,109 @@
import logging
-import six
-
-from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.client.publishers\
- import zmq_publisher_base
-from oslo_messaging._drivers.zmq_driver.client import zmq_response
-from oslo_messaging._drivers.zmq_driver.server.consumers\
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
+from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
+from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
+from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
+from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._i18n import _LE, _LI
+from oslo_messaging._i18n import _LE, _LI, _LW
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class DealerIncomingMessage(base.RpcIncomingMessage):
-
- def __init__(self, context, message):
- super(DealerIncomingMessage, self).__init__(context, message)
-
- def reply(self, reply=None, failure=None):
- """Reply is not needed for non-call messages"""
-
- def acknowledge(self):
- """Not sending acknowledge"""
-
- def requeue(self):
- """Requeue is not supported"""
-
-
-class DealerIncomingRequest(base.RpcIncomingMessage):
-
- def __init__(self, socket, reply_id, message_id, context, message):
- super(DealerIncomingRequest, self).__init__(context, message)
- self.reply_socket = socket
- self.reply_id = reply_id
- self.message_id = message_id
-
- def reply(self, reply=None, failure=None):
- if failure is not None:
- failure = rpc_common.serialize_remote_exception(failure)
- response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
- message_id=self.message_id,
- reply_id=self.reply_id,
- reply_body=reply,
- failure=failure)
-
- LOG.debug("Replying %s", self.message_id)
-
- self.reply_socket.send(b'', zmq.SNDMORE)
- self.reply_socket.send(six.b(str(zmq_names.REPLY_TYPE)), zmq.SNDMORE)
- self.reply_socket.send(self.reply_id, zmq.SNDMORE)
- self.reply_socket.send(self.message_id, zmq.SNDMORE)
- self.reply_socket.send_pyobj(response)
-
- def requeue(self):
- """Requeue is not supported"""
-
-
-class DealerConsumer(zmq_consumer_base.ConsumerBase):
+class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
- super(DealerConsumer, self).__init__(conf, poller, server)
- self.matchmaker = server.matchmaker
- self.target = server.target
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, self.matchmaker, zmq.ROUTER, zmq.DEALER)
- self.socket = self.sockets_manager.get_socket_to_routers()
- self.poller.register(self.socket, self.receive_message)
- self.host = self.socket.handle.identity
- self.target_updater = zmq_consumer_base.TargetUpdater(
- conf, self.matchmaker, self.target, self.host,
- zmq.DEALER)
+ self.ack_sender = zmq_senders.AckSenderProxy(conf)
+ self.reply_sender = zmq_senders.ReplySenderProxy(conf)
+ self.received_messages = zmq_ttl_cache.TTLCache(
+ ttl=conf.oslo_messaging_zmq.rpc_message_ttl
+ )
+ self.sockets_manager = zmq_sockets_manager.SocketsManager(
+ conf, server.matchmaker, zmq.ROUTER, zmq.DEALER)
+ self.host = None
+ super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER)
self.connection_updater = ConsumerConnectionUpdater(
conf, self.matchmaker, self.socket)
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
+ def subscribe_socket(self, socket_type):
+ try:
+ socket = self.sockets_manager.get_socket_to_routers()
+ self.sockets.append(socket)
+ self.host = socket.handle.identity
+ self.poller.register(socket, self.receive_message)
+ return socket
+ except zmq.ZMQError as e:
+ LOG.error(_LE("Failed connecting to ROUTER socket %(e)s") % e)
+ raise rpc_common.RPCException(str(e))
+
+ def _receive_request(self, socket):
+ empty = socket.recv()
+ assert empty == b'', 'Bad format: empty delimiter expected'
+ reply_id = socket.recv()
+ msg_type = int(socket.recv())
+ message_id = socket.recv_string()
+ context, message = socket.recv_loaded()
+ return reply_id, msg_type, message_id, context, message
+
def receive_message(self, socket):
try:
- empty = socket.recv()
- assert empty == b'', 'Bad format: empty delimiter expected'
- reply_id = socket.recv()
- message_type = int(socket.recv())
- message_id = socket.recv()
- context = socket.recv_pyobj()
- message = socket.recv_pyobj()
- LOG.debug("[%(host)s] Received message %(id)s",
- {"host": self.host, "id": message_id})
- if message_type == zmq_names.CALL_TYPE:
- return DealerIncomingRequest(
- socket, reply_id, message_id, context, message)
- elif message_type in zmq_names.NON_BLOCKING_TYPES:
- return DealerIncomingMessage(context, message)
+ reply_id, msg_type, message_id, context, message = \
+ self._receive_request(socket)
+
+ if msg_type == zmq_names.CALL_TYPE or \
+ msg_type in zmq_names.NON_BLOCKING_TYPES:
+
+ ack_sender = self.ack_sender \
+ if self.conf.oslo_messaging_zmq.rpc_use_acks else None
+ reply_sender = self.reply_sender \
+ if msg_type == zmq_names.CALL_TYPE else None
+
+ message = zmq_incoming_message.ZmqIncomingMessage(
+ context, message, reply_id, message_id, socket,
+ ack_sender, reply_sender
+ )
+
+ # drop duplicate message
+ if message_id in self.received_messages:
+ LOG.warning(
+ _LW("[%(host)s] Dropping duplicate %(msg_type)s "
+ "message %(msg_id)s"),
+ {"host": self.host,
+ "msg_type": zmq_names.message_type_str(msg_type),
+ "msg_id": message_id}
+ )
+ message.acknowledge()
+ return None
+
+ self.received_messages.add(message_id)
+ LOG.debug(
+ "[%(host)s] Received %(msg_type)s message %(msg_id)s",
+ {"host": self.host,
+ "msg_type": zmq_names.message_type_str(msg_type),
+ "msg_id": message_id}
+ )
+ return message
+
else:
LOG.error(_LE("Unknown message type: %s"),
- zmq_names.message_type_str(message_type))
- except (zmq.ZMQError, AssertionError) as e:
+ zmq_names.message_type_str(msg_type))
+ except (zmq.ZMQError, AssertionError, ValueError) as e:
LOG.error(_LE("Receiving message failure: %s"), str(e))
def cleanup(self):
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
+ self.received_messages.cleanup()
+ self.connection_updater.cleanup()
super(DealerConsumer, self).cleanup()
- self.matchmaker.unregister(self.target, self.host,
- zmq_names.socket_type_str(zmq.DEALER))
class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py
deleted file mode 100644
index 719c24e..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.zmq_driver.server.consumers\
- import zmq_consumer_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._i18n import _LE, _LI
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class PullIncomingMessage(base.RpcIncomingMessage):
-
- def __init__(self, context, message):
- super(PullIncomingMessage, self).__init__(context, message)
-
- def reply(self, reply=None, failure=None):
- """Reply is not needed for non-call messages."""
-
- def acknowledge(self):
- """Acknowledgments are not supported by this type of consumer."""
-
- def requeue(self):
- """Requeueing is not supported."""
-
-
-class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
-
- def __init__(self, conf, poller, server):
- super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL)
- LOG.info(_LI("[%s] Run PULL consumer"), self.host)
-
- def receive_message(self, socket):
- try:
- request = socket.recv_pyobj()
- msg_type = request.msg_type
- assert msg_type is not None, 'Bad format: msg type expected'
- context = request.context
- message = request.message
- LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
- {"host": self.host,
- "type": request.msg_type,
- "id": request.message_id,
- "target": request.target})
-
- if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
- return PullIncomingMessage(context, message)
- else:
- LOG.error(_LE("Unknown message type: %s"), msg_type)
-
- except (zmq.ZMQError, AssertionError) as e:
- LOG.error(_LE("Receiving message failed: %s"), str(e))
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
index da487f5..e483634 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
@@ -14,8 +14,8 @@
import logging
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.zmq_driver.server.consumers\
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
+from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_async
@@ -27,29 +27,11 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class RouterIncomingMessage(base.RpcIncomingMessage):
-
- def __init__(self, context, message, socket, reply_id, msg_id,
- poller):
- super(RouterIncomingMessage, self).__init__(context, message)
- self.socket = socket
- self.reply_id = reply_id
- self.msg_id = msg_id
- self.message = message
-
- def reply(self, reply=None, failure=None):
- """Reply is not needed for non-call messages"""
-
- def acknowledge(self):
- LOG.debug("Not sending acknowledge for %s", self.msg_id)
-
- def requeue(self):
- """Requeue is not supported"""
-
-
class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
+ self.ack_sender = zmq_senders.AckSenderDirect(conf)
+ self.reply_sender = zmq_senders.ReplySenderDirect(conf)
super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER)
LOG.info(_LI("[%s] Run ROUTER consumer"), self.host)
@@ -57,28 +39,37 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
- envelope = socket.recv_pyobj()
- request = socket.recv_pyobj()
- return request, envelope, reply_id
+ msg_type = int(socket.recv())
+ message_id = socket.recv_string()
+ context, message = socket.recv_loaded()
+ return reply_id, msg_type, message_id, context, message
def receive_message(self, socket):
try:
- request, envelope, reply_id = self._receive_request(socket)
- LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
- {"host": self.host,
- "type": request.msg_type,
- "id": request.message_id,
- "target": request.target})
+ reply_id, msg_type, message_id, context, message = \
+ self._receive_request(socket)
- if request.msg_type == zmq_names.CALL_TYPE:
- return zmq_incoming_message.ZmqIncomingRequest(
- socket, reply_id, request, envelope, self.poller)
- elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
- return RouterIncomingMessage(
- request.context, request.message, socket, reply_id,
- request.message_id, self.poller)
+ LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s",
+ {"host": self.host,
+ "msg_type": zmq_names.message_type_str(msg_type),
+ "msg_id": message_id})
+
+ if msg_type == zmq_names.CALL_TYPE or \
+ msg_type in zmq_names.NON_BLOCKING_TYPES:
+ ack_sender = self.ack_sender \
+ if self.conf.oslo_messaging_zmq.rpc_use_acks else None
+ reply_sender = self.reply_sender \
+ if msg_type == zmq_names.CALL_TYPE else None
+ return zmq_incoming_message.ZmqIncomingMessage(
+ context, message, reply_id, message_id, socket,
+ ack_sender, reply_sender
+ )
else:
- LOG.error(_LE("Unknown message type: %s"), request.msg_type)
-
- except (zmq.ZMQError, AssertionError) as e:
+ LOG.error(_LE("Unknown message type: %s"),
+ zmq_names.message_type_str(msg_type))
+ except (zmq.ZMQError, AssertionError, ValueError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
+
+ def cleanup(self):
+ LOG.info(_LI("[%s] Destroy ROUTER consumer"), self.host)
+ super(RouterConsumer, self).cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
index 6aa8ec4..23e8807 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
@@ -16,9 +16,9 @@ import logging
import six
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.zmq_driver.server.consumers\
+from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
+from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
@@ -29,21 +29,6 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class SubIncomingMessage(base.RpcIncomingMessage):
-
- def __init__(self, context, message):
- super(SubIncomingMessage, self).__init__(context, message)
-
- def reply(self, reply=None, failure=None):
- """Reply is not needed for non-call messages."""
-
- def acknowledge(self):
- """Requeue is not supported"""
-
- def requeue(self):
- """Requeue is not supported"""
-
-
class SubConsumer(zmq_consumer_base.ConsumerBase):
def __init__(self, conf, poller, server):
@@ -78,8 +63,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
def _receive_request(socket):
topic_filter = socket.recv()
message_id = socket.recv()
- context = socket.recv_pyobj()
- message = socket.recv_pyobj()
+ context, message = socket.recv_loaded()
LOG.debug("Received %(topic_filter)s topic message %(id)s",
{'id': message_id, 'topic_filter': topic_filter})
return context, message
@@ -89,8 +73,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
context, message = self._receive_request(socket)
if not message:
return None
-
- return SubIncomingMessage(context, message)
+ return zmq_incoming_message.ZmqIncomingMessage(context, message)
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
index 2dc8ec3..d6ab573 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
@@ -12,49 +12,51 @@
# License for the specific language governing permissions and limitations
# under the License.
-
import logging
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class ZmqIncomingRequest(base.RpcIncomingMessage):
+class ZmqIncomingMessage(base.RpcIncomingMessage):
+
+ def __init__(self, context, message, reply_id=None, message_id=None,
+ socket=None, ack_sender=None, reply_sender=None):
+
+ if ack_sender is not None or reply_sender is not None:
+ assert socket is not None, "Valid socket expected!"
+ assert message_id is not None, "Valid message ID expected!"
+ assert reply_id is not None, "Valid reply ID expected!"
+
+ super(ZmqIncomingMessage, self).__init__(context, message)
+
+ self.reply_id = reply_id
+ self.message_id = message_id
+ self.socket = socket
+ self.ack_sender = ack_sender
+ self.reply_sender = reply_sender
- def __init__(self, socket, rep_id, request, envelope, poller):
- super(ZmqIncomingRequest, self).__init__(request.context,
- request.message)
- self.reply_socket = socket
- self.reply_id = rep_id
- self.request = request
- self.envelope = envelope
- self.received = None
- self.poller = poller
+ def acknowledge(self):
+ if self.ack_sender is not None:
+ ack = zmq_response.Ack(message_id=self.message_id,
+ reply_id=self.reply_id)
+ self.ack_sender.send(self.socket, ack)
def reply(self, reply=None, failure=None):
- if failure is not None:
- failure = rpc_common.serialize_remote_exception(failure)
- response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
- message_id=self.request.message_id,
- reply_id=self.reply_id,
- reply_body=reply,
- failure=failure)
-
- LOG.debug("Replying %s", (str(self.request.message_id)))
-
- self.received = True
- self.reply_socket.send(self.reply_id, zmq.SNDMORE)
- self.reply_socket.send(b'', zmq.SNDMORE)
- self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE)
- self.reply_socket.send_pyobj(response)
+ if self.reply_sender is not None:
+ if failure is not None:
+ failure = rpc_common.serialize_remote_exception(failure)
+ reply = zmq_response.Reply(message_id=self.message_id,
+ reply_id=self.reply_id,
+ reply_body=reply,
+ failure=failure)
+ self.reply_sender.send(self.socket, reply)
def requeue(self):
"""Requeue is not supported"""
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
index c963c45..b40bdc0 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
@@ -41,11 +41,14 @@ class ZmqServer(base.PollStyleListener):
self.poller = poller or zmq_async.get_poller()
self.router_consumer = zmq_router_consumer.RouterConsumer(
- conf, self.poller, self) if not conf.use_router_proxy else None
+ conf, self.poller, self) \
+ if not conf.oslo_messaging_zmq.use_router_proxy else None
self.dealer_consumer = zmq_dealer_consumer.DealerConsumer(
- conf, self.poller, self) if conf.use_router_proxy else None
+ conf, self.poller, self) \
+ if conf.oslo_messaging_zmq.use_router_proxy else None
self.sub_consumer = zmq_sub_consumer.SubConsumer(
- conf, self.poller, self) if conf.use_pub_sub else None
+ conf, self.poller, self) \
+ if conf.oslo_messaging_zmq.use_pub_sub else None
self.consumers = []
if self.router_consumer is not None:
@@ -58,14 +61,14 @@ class ZmqServer(base.PollStyleListener):
@base.batch_poll_helper
def poll(self, timeout=None):
message, socket = self.poller.poll(
- timeout or self.conf.rpc_poll_timeout)
+ timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
return message
def stop(self):
- if self.router_consumer:
- LOG.info(_LI("Stop server %(address)s:%(port)s"),
- {'address': self.router_consumer.address,
- 'port': self.router_consumer.port})
+ self.poller.close()
+ LOG.info(_LI("Stop server %(target)s"), {'target': self.target})
+ for consumer in self.consumers:
+ consumer.stop()
def cleanup(self):
self.poller.close()
@@ -94,7 +97,7 @@ class ZmqNotificationServer(base.PollStyleListener):
@base.batch_poll_helper
def poll(self, timeout=None):
message, socket = self.poller.poll(
- timeout or self.conf.rpc_poll_timeout)
+ timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
return message
def stop(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py
new file mode 100644
index 0000000..963d2d9
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_ttl_cache.py
@@ -0,0 +1,79 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import threading
+import time
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+
+zmq = zmq_async.import_zmq()
+
+
+class TTLCache(object):
+
+ def __init__(self, ttl=None):
+ self._lock = threading.Lock()
+ self._expiration_times = {}
+ self._executor = None
+
+ if not (ttl is None or isinstance(ttl, (int, float))):
+ raise ValueError('ttl must be None or a number')
+
+ # no (i.e. infinite) ttl
+ if ttl is None or ttl <= 0:
+ ttl = float('inf')
+ else:
+ self._executor = zmq_async.get_executor(self._update_cache)
+
+ self._ttl = ttl
+
+ if self._executor:
+ self._executor.execute()
+
+ @staticmethod
+ def _is_expired(expiration_time, current_time):
+ return expiration_time <= current_time
+
+ def add(self, item):
+ with self._lock:
+ self._expiration_times[item] = time.time() + self._ttl
+
+ def discard(self, item):
+ with self._lock:
+ self._expiration_times.pop(item, None)
+
+ def __contains__(self, item):
+ with self._lock:
+ expiration_time = self._expiration_times.get(item)
+ if expiration_time is None:
+ return False
+ if self._is_expired(expiration_time, time.time()):
+ self._expiration_times.pop(item)
+ return False
+ return True
+
+ def _update_cache(self):
+ with self._lock:
+ current_time = time.time()
+ self._expiration_times = \
+ {item: expiration_time for
+ item, expiration_time in six.iteritems(self._expiration_times)
+ if not self._is_expired(expiration_time, current_time)}
+ time.sleep(self._ttl)
+
+ def cleanup(self):
+ if self._executor:
+ self._executor.stop()
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
index b33c288..0175e7e 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
@@ -24,11 +24,11 @@ def get_tcp_direct_address(host):
def get_tcp_random_address(conf):
- return "tcp://%s" % conf.rpc_zmq_bind_address
+ return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address
def get_broker_address(conf):
- return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
+ return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir
def prefix_str(key, listener_type):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_async.py b/oslo_messaging/_drivers/zmq_driver/zmq_async.py
index a248059..93135da 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_async.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_async.py
@@ -42,6 +42,15 @@ def get_executor(method):
return threading_poller.ThreadingExecutor(method)
+def get_pool(size):
+ import futurist
+
+ if eventletutils.is_monkey_patched('thread'):
+ return futurist.GreenThreadPoolExecutor(size)
+
+ return futurist.ThreadPoolExecutor(size)
+
+
def get_queue():
if eventletutils.is_monkey_patched('thread'):
import eventlet
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
index 51f68c6..6ec99cb 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
@@ -17,15 +17,10 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
-FIELD_TYPE = 'type'
-FIELD_FAILURE = 'failure'
-FIELD_REPLY = 'reply'
-FIELD_ID = 'id'
FIELD_MSG_ID = 'message_id'
-FIELD_MSG_TYPE = 'msg_type'
FIELD_REPLY_ID = 'reply_id'
-FIELD_TARGET = 'target'
-FIELD_ROUTING_KEY = 'routing_key'
+FIELD_REPLY_BODY = 'reply_body'
+FIELD_FAILURE = 'failure'
IDX_REPLY_TYPE = 1
@@ -69,8 +64,8 @@ def socket_type_str(socket_type):
def message_type_str(message_type):
msg_type_str = {CALL_TYPE: "CALL",
CAST_TYPE: "CAST",
- CAST_FANOUT_TYPE: "CAST_FANOUT_TYPE",
- NOTIFY_TYPE: "NOTIFY_TYPE",
- REPLY_TYPE: "REPLY_TYPE",
- ACK_TYPE: "ACK_TYPE"}
+ CAST_FANOUT_TYPE: "CAST_FANOUT",
+ NOTIFY_TYPE: "NOTIFY",
+ REPLY_TYPE: "REPLY",
+ ACK_TYPE: "ACK"}
return msg_type_str[message_type]
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
new file mode 100644
index 0000000..94e152d
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
@@ -0,0 +1,153 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import socket
+
+from oslo_config import cfg
+
+from oslo_messaging._drivers import base
+from oslo_messaging import server
+
+
+MATCHMAKER_BACKENDS = ('redis', 'dummy')
+MATCHMAKER_DEFAULT = 'redis'
+
+
+zmq_opts = [
+ cfg.StrOpt('rpc_zmq_bind_address', default='*',
+ deprecated_group='DEFAULT',
+ help='ZeroMQ bind address. Should be a wildcard (*), '
+ 'an ethernet interface, or IP. '
+ 'The "host" option should point or resolve to this '
+ 'address.'),
+
+ cfg.StrOpt('rpc_zmq_matchmaker', default=MATCHMAKER_DEFAULT,
+ choices=MATCHMAKER_BACKENDS,
+ deprecated_group='DEFAULT',
+ help='MatchMaker driver.'),
+
+ cfg.IntOpt('rpc_zmq_contexts', default=1,
+ deprecated_group='DEFAULT',
+ help='Number of ZeroMQ contexts, defaults to 1.'),
+
+ cfg.IntOpt('rpc_zmq_topic_backlog',
+ deprecated_group='DEFAULT',
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
+ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
+ deprecated_group='DEFAULT',
+ help='Directory for holding IPC sockets.'),
+
+ cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
+ sample_default='localhost',
+ deprecated_group='DEFAULT',
+ help='Name of this node. Must be a valid hostname, FQDN, or '
+ 'IP address. Must match "host" option, if running Nova.'),
+
+ cfg.IntOpt('rpc_cast_timeout', default=-1,
+ deprecated_group='DEFAULT',
+ help='Seconds to wait before a cast expires (TTL). '
+ 'The default value of -1 specifies an infinite linger '
+ 'period. The value of 0 specifies no linger period. '
+ 'Pending messages shall be discarded immediately '
+ 'when the socket is closed. Only supported by impl_zmq.'),
+
+ cfg.IntOpt('rpc_poll_timeout', default=1,
+ deprecated_group='DEFAULT',
+ help='The default number of seconds that poll should wait. '
+ 'Poll raises timeout exception when timeout expired.'),
+
+ cfg.IntOpt('zmq_target_expire', default=300,
+ deprecated_group='DEFAULT',
+ help='Expiration timeout in seconds of a name service record '
+ 'about existing target ( < 0 means no timeout).'),
+
+ cfg.IntOpt('zmq_target_update', default=180,
+ deprecated_group='DEFAULT',
+ help='Update period in seconds of a name service record '
+ 'about existing target.'),
+
+ cfg.BoolOpt('use_pub_sub', default=True,
+ deprecated_group='DEFAULT',
+ help='Use PUB/SUB pattern for fanout methods. '
+ 'PUB/SUB always uses proxy.'),
+
+ cfg.BoolOpt('use_router_proxy', default=True,
+ deprecated_group='DEFAULT',
+ help='Use ROUTER remote proxy.'),
+
+ cfg.PortOpt('rpc_zmq_min_port',
+ default=49153,
+ deprecated_group='DEFAULT',
+ help='Minimal port number for random ports range.'),
+
+ cfg.IntOpt('rpc_zmq_max_port',
+ min=1,
+ max=65536,
+ default=65536,
+ deprecated_group='DEFAULT',
+ help='Maximal port number for random ports range.'),
+
+ cfg.IntOpt('rpc_zmq_bind_port_retries',
+ default=100,
+ deprecated_group='DEFAULT',
+ help='Number of retries to find free port number before '
+ 'fail with ZMQBindError.'),
+
+ cfg.StrOpt('rpc_zmq_serialization', default='json',
+ choices=('json', 'msgpack'),
+ deprecated_group='DEFAULT',
+ help='Default serialization mechanism for '
+ 'serializing/deserializing outgoing/incoming messages')
+]
+
+zmq_ack_retry_opts = [
+ cfg.IntOpt('rpc_thread_pool_size', default=100,
+ help='Maximum number of (green) threads to work concurrently.'),
+
+ cfg.IntOpt('rpc_message_ttl', default=300,
+ help='Expiration timeout in seconds of a sent/received message '
+ 'after which it is not tracked anymore by a '
+ 'client/server.'),
+
+ cfg.BoolOpt('rpc_use_acks', default=True,
+ help='Wait for message acknowledgements from receivers. '
+ 'This mechanism works only via proxy without PUB/SUB.'),
+
+ cfg.IntOpt('rpc_ack_timeout_base', default=10,
+ help='Number of seconds to wait for an ack from a cast/call. '
+ 'After each retry attempt this timeout is multiplied by '
+ 'some specified multiplier.'),
+
+ cfg.IntOpt('rpc_ack_timeout_multiplier', default=2,
+ help='Number to multiply base ack timeout by after each retry '
+ 'attempt.'),
+
+ cfg.IntOpt('rpc_retry_attempts', default=3,
+ help='Default number of message sending attempts in case '
+ 'of any problems occurred: positive value N means '
+ 'at most N retries, 0 means no retries, None or -1 '
+ '(or any other negative values) mean to retry forever. '
+ 'This option is used only if acknowledgments are enabled.')
+]
+
+
+def register_opts(conf):
+ opt_group = cfg.OptGroup(name='oslo_messaging_zmq',
+ title='ZeroMQ driver options')
+ conf.register_opts(zmq_opts, group=opt_group)
+ conf.register_opts(zmq_ack_retry_opts, group=opt_group)
+ conf.register_opts(server._pool_opts)
+ conf.register_opts(base.base_opts)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py
index 28fe6c8..678741d 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_poller.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_poller.py
@@ -62,6 +62,13 @@ class ZmqPoller(object):
Should return received message object
:type recv_method: callable
"""
+ @abc.abstractmethod
+ def unregister(self, socket):
+ """Unregister socket from poll
+
+ :param socket: Socket to unsubscribe from polling
+ :type socket: zmq.Socket
+ """
@abc.abstractmethod
def poll(self, timeout=None):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index a97343e..285eafa 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -23,6 +23,8 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging import exceptions
+from oslo_serialization.serializer import json_serializer
+from oslo_serialization.serializer import msgpack_serializer
LOG = logging.getLogger(__name__)
@@ -31,7 +33,13 @@ zmq = zmq_async.import_zmq()
class ZmqSocket(object):
- def __init__(self, conf, context, socket_type, high_watermark=0):
+ SERIALIZERS = {
+ 'json': json_serializer.JSONSerializer(),
+ 'msgpack': msgpack_serializer.MessagePackSerializer()
+ }
+
+ def __init__(self, conf, context, socket_type, immediate=True,
+ high_watermark=0):
self.conf = conf
self.context = context
self.socket_type = socket_type
@@ -39,12 +47,23 @@ class ZmqSocket(object):
self.handle.set_hwm(high_watermark)
self.close_linger = -1
- if self.conf.rpc_cast_timeout > 0:
- self.close_linger = self.conf.rpc_cast_timeout * 1000
+ if self.conf.oslo_messaging_zmq.rpc_cast_timeout > 0:
+ self.close_linger = \
+ self.conf.oslo_messaging_zmq.rpc_cast_timeout * 1000
self.handle.setsockopt(zmq.LINGER, self.close_linger)
+ # Put messages to only connected queues
+ self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
self.handle.identity = six.b(str(uuid.uuid4()))
self.connections = set()
+ def _get_serializer(self, serialization):
+ serializer = self.SERIALIZERS.get(serialization, None)
+ if serializer is None:
+ raise NotImplementedError(
+ "Serialization '{}' is not supported".format(serialization)
+ )
+ return serializer
+
def type_name(self):
return zmq_names.socket_type_str(self.socket_type)
@@ -77,6 +96,14 @@ class ZmqSocket(object):
def send_multipart(self, *args, **kwargs):
self.handle.send_multipart(*args, **kwargs)
+ def send_dumped(self, obj, *args, **kwargs):
+ serialization = kwargs.pop(
+ 'serialization',
+ self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
+ serializer = self._get_serializer(serialization)
+ s = serializer.dump_as_bytes(obj)
+ self.handle.send(s, *args, **kwargs)
+
def recv(self, *args, **kwargs):
return self.handle.recv(*args, **kwargs)
@@ -92,6 +119,15 @@ class ZmqSocket(object):
def recv_multipart(self, *args, **kwargs):
return self.handle.recv_multipart(*args, **kwargs)
+ def recv_loaded(self, *args, **kwargs):
+ serialization = kwargs.pop(
+ 'serialization',
+ self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
+ serializer = self._get_serializer(serialization)
+ s = self.handle.recv(*args, **kwargs)
+ obj = serializer.load_from_bytes(s)
+ return obj
+
def close(self, *args, **kwargs):
self.handle.close(*args, **kwargs)
@@ -106,34 +142,67 @@ class ZmqSocket(object):
"address": address})
self.connect(address)
except zmq.ZMQError as e:
- errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
- % (stype, address, e)
- LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s"),
- (stype, address, e))
+ errmsg = _LE("Failed connecting %(stype)s to %(address)s: %(e)s") \
+ % {"stype": stype, "address": address, "e": e}
+ LOG.error(_LE("Failed connecting %(stype)s to %(address)s: %(e)s"),
+ {"stype": stype, "address": address, "e": e})
raise rpc_common.RPCException(errmsg)
def connect_to_host(self, host):
- address = zmq_address.get_tcp_direct_address(host)
+ address = zmq_address.get_tcp_direct_address(
+ host.decode('utf-8') if six.PY3 and
+ isinstance(host, six.binary_type) else host
+ )
self.connect_to_address(address)
-class ZmqPortRangeExceededException(exceptions.MessagingException):
- """Raised by ZmqRandomPortSocket - wrapping zmq.ZMQBindError"""
+class ZmqPortBusy(exceptions.MessagingException):
+ """Raised when binding to a port failure"""
+
+ def __init__(self, port_number):
+ super(ZmqPortBusy, self).__init__()
+ self.port_number = port_number
class ZmqRandomPortSocket(ZmqSocket):
- def __init__(self, conf, context, socket_type, high_watermark=0):
- super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type,
- high_watermark)
+ def __init__(self, conf, context, socket_type, host=None,
+ high_watermark=0):
+ super(ZmqRandomPortSocket, self).__init__(
+ conf, context, socket_type, immediate=False,
+ high_watermark=high_watermark)
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
-
+ if host is None:
+ host = conf.oslo_messaging_zmq.rpc_zmq_host
try:
self.port = self.handle.bind_to_random_port(
self.bind_address,
- min_port=conf.rpc_zmq_min_port,
- max_port=conf.rpc_zmq_max_port,
- max_tries=conf.rpc_zmq_bind_port_retries)
+ min_port=conf.oslo_messaging_zmq.rpc_zmq_min_port,
+ max_port=conf.oslo_messaging_zmq.rpc_zmq_max_port,
+ max_tries=conf.oslo_messaging_zmq.rpc_zmq_bind_port_retries)
+ self.connect_address = zmq_address.combine_address(host, self.port)
except zmq.ZMQBindError:
LOG.error(_LE("Random ports range exceeded!"))
- raise ZmqPortRangeExceededException()
+ raise ZmqPortBusy(port_number=0)
+
+
+class ZmqFixedPortSocket(ZmqSocket):
+
+ def __init__(self, conf, context, socket_type, host, port,
+ high_watermark=0):
+ super(ZmqFixedPortSocket, self).__init__(
+ conf, context, socket_type, immediate=False,
+ high_watermark=high_watermark)
+ self.connect_address = zmq_address.combine_address(host, port)
+ self.bind_address = zmq_address.get_tcp_direct_address(
+ zmq_address.combine_address(
+ conf.oslo_messaging_zmq.rpc_zmq_bind_address, port))
+ self.host = host
+ self.port = port
+
+ try:
+ self.handle.bind(self.bind_address)
+ except zmq.ZMQError as e:
+ LOG.exception(e)
+ LOG.error(_LE("Chosen port %d is being busy.") % self.port)
+ raise ZmqPortBusy(port_number=port)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
index 0b4594a..2d4f9e0 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
@@ -31,12 +31,17 @@ class UpdaterBase(object):
self.conf = conf
self.matchmaker = matchmaker
self.update_method = update_method
+ # make first update immediately
+ self.update_method()
self.executor = zmq_async.get_executor(method=self._update_loop)
self.executor.execute()
+ def stop(self):
+ self.executor.stop()
+
def _update_loop(self):
self.update_method()
- time.sleep(self.conf.zmq_target_update)
+ time.sleep(self.conf.oslo_messaging_zmq.zmq_target_update)
def cleanup(self):
self.executor.stop()
diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py
index c6edddd..5eb4e5e 100644
--- a/oslo_messaging/conffixture.py
+++ b/oslo_messaging/conffixture.py
@@ -49,16 +49,17 @@ class ConfFixture(fixtures.Fixture):
'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts',
'oslo_messaging_rabbit')
_import_opts(self.conf,
- 'oslo_messaging._drivers.amqp', 'amqp_opts',
+ 'oslo_messaging._drivers.base', 'base_opts',
'oslo_messaging_rabbit')
_import_opts(self.conf,
'oslo_messaging._drivers.amqp', 'amqp_opts',
- 'oslo_messaging_qpid')
+ 'oslo_messaging_rabbit')
_import_opts(self.conf,
'oslo_messaging._drivers.amqp1_driver.opts',
'amqp1_opts', 'oslo_messaging_amqp')
_import_opts(self.conf,
- 'oslo_messaging._drivers.impl_zmq', 'zmq_opts')
+ 'oslo_messaging._drivers.zmq_driver.zmq_options',
+ 'zmq_opts', 'oslo_messaging_zmq')
_import_opts(self.conf,
'oslo_messaging._drivers.zmq_driver.'
'matchmaker.matchmaker_redis',
diff --git a/oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-error.po b/oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-error.po
new file mode 100644
index 0000000..ffe36c8
--- /dev/null
+++ b/oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-error.po
@@ -0,0 +1,61 @@
+# Translations template for oslo.messaging.
+# Copyright (C) 2015 ORGANIZATION
+# This file is distributed under the same license as the oslo.messaging
+# project.
+#
+# Translators:
+# Andi Chandler <andi@gowling.com>, 2014-2015
+# Andi Chandler <andi@gowling.com>, 2016. #zanata
+# Andreas Jaeger <jaegerandi@gmail.com>, 2016. #zanata
+msgid ""
+msgstr ""
+"Project-Id-Version: oslo.messaging 5.5.1.dev3\n"
+"Report-Msgid-Bugs-To: https://bugs.launchpad.net/openstack-i18n/\n"
+"POT-Creation-Date: 2016-07-01 03:41+0000\n"
+"MIME-Version: 1.0\n"
+"Content-Type: text/plain; charset=UTF-8\n"
+"Content-Transfer-Encoding: 8bit\n"
+"PO-Revision-Date: 2016-06-28 05:52+0000\n"
+"Last-Translator: Andi Chandler <andi@gowling.com>\n"
+"Language: en-GB\n"
+"Plural-Forms: nplurals=2; plural=(n != 1);\n"
+"Generated-By: Babel 2.0\n"
+"X-Generator: Zanata 3.7.3\n"
+"Language-Team: English (United Kingdom)\n"
+
+#, python-format
+msgid "An exception occurred processing the API call: %s "
+msgstr "An exception occurred processing the API call: %s "
+
+msgid "Can not acknowledge message. Skip processing"
+msgstr "Can not acknowledge message. Skip processing"
+
+msgid "Can not send reply for message"
+msgstr "Can not send reply for message"
+
+#, python-format
+msgid "Could not send notification to %(topic)s. Payload=%(message)s"
+msgstr "Could not send notification to %(topic)s. Payload=%(message)s"
+
+msgid "Exception during message handling"
+msgstr "Exception during message handling"
+
+msgid "Exception during message handling."
+msgstr "Exception during message handling."
+
+msgid "Exception during messages handling."
+msgstr "Exception during messages handling."
+
+msgid "Fail to ack/requeue message."
+msgstr "Fail to ack/requeue message."
+
+#, python-format
+msgid ""
+"Problem '%(e)s' attempting to send to notification system. Payload="
+"%(payload)s"
+msgstr ""
+"Problem '%(e)s' attempting to send to notification system. Payload="
+"%(payload)s"
+
+msgid "Unexpected exception occurred."
+msgstr "Unexpected exception occurred."
diff --git a/oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-warning.po b/oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-warning.po
index 714a7da..486ab02 100644
--- a/oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-warning.po
+++ b/oslo_messaging/locale/en_GB/LC_MESSAGES/oslo_messaging-log-warning.po
@@ -5,16 +5,17 @@
#
# Translators:
# Andi Chandler <andi@gowling.com>, 2014-2015
+# Andi Chandler <andi@gowling.com>, 2016. #zanata
# Andreas Jaeger <jaegerandi@gmail.com>, 2016. #zanata
msgid ""
msgstr ""
-"Project-Id-Version: oslo.messaging 4.5.1.dev102\n"
+"Project-Id-Version: oslo.messaging 5.5.1.dev3\n"
"Report-Msgid-Bugs-To: https://bugs.launchpad.net/openstack-i18n/\n"
-"POT-Creation-Date: 2016-04-21 02:49+0000\n"
+"POT-Creation-Date: 2016-07-01 03:41+0000\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
-"PO-Revision-Date: 2015-08-27 12:55+0000\n"
+"PO-Revision-Date: 2016-06-28 05:52+0000\n"
"Last-Translator: Andi Chandler <andi@gowling.com>\n"
"Language: en-GB\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
@@ -25,3 +26,20 @@ msgstr ""
#, python-format
msgid "Failed to load any notifiers for %s"
msgstr "Failed to load any notifiers for %s"
+
+#, python-format
+msgid "Possible hang: %s"
+msgstr "Possible hang: %s"
+
+msgid ""
+"Restarting a MessageHandlingServer is inherently racy. It is deprecated, and "
+"will become a noop in a future release of oslo.messaging. If you need to "
+"restart MessageHandlingServer you should instantiate a new object."
+msgstr ""
+"Restarting a MessageHandlingServer is inherently racy. It is deprecated, and "
+"will become a noop in a future release of oslo.messaging. If you need to "
+"restart MessageHandlingServer you should instantiate a new object."
+
+#, python-format
+msgid "Unknown priority \"%s\""
+msgstr "Unknown priority \"%s\""
diff --git a/oslo_messaging/notify/listener.py b/oslo_messaging/notify/listener.py
index 5f858c5..386e79e 100644
--- a/oslo_messaging/notify/listener.py
+++ b/oslo_messaging/notify/listener.py
@@ -59,7 +59,7 @@ A simple example of a notification listener with multiple endpoints might be::
transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [
- oslo_messaging.Target(topic='notifications')
+ oslo_messaging.Target(topic='notifications'),
oslo_messaging.Target(topic='notifications_bis')
]
endpoints = [
diff --git a/oslo_messaging/notify/notifier.py b/oslo_messaging/notify/notifier.py
index 2d13e03..af42569 100644
--- a/oslo_messaging/notify/notifier.py
+++ b/oslo_messaging/notify/notifier.py
@@ -16,6 +16,7 @@
# under the License.
import abc
+import argparse
import logging
import uuid
@@ -23,6 +24,7 @@ from debtcollector import renames
from oslo_config import cfg
from oslo_utils import timeutils
import six
+from stevedore import extension
from stevedore import named
from oslo_messaging._i18n import _LE
@@ -58,6 +60,49 @@ _notifier_opts = [
_LOG = logging.getLogger(__name__)
+def _send_notification():
+ """Command line tool to send notifications manually."""
+ parser = argparse.ArgumentParser(
+ description='Oslo.messaging notification sending',
+ )
+ parser.add_argument('--config-file',
+ help='Path to configuration file')
+ parser.add_argument('--transport-url',
+ help='Transport URL')
+ parser.add_argument('--publisher-id',
+ help='Publisher ID')
+ parser.add_argument('--event-type',
+ default="test",
+ help="Event type")
+ parser.add_argument('--topic',
+ nargs='*',
+ help="Topic to send to")
+ parser.add_argument('--priority',
+ default="info",
+ choices=("info",
+ "audit",
+ "warn",
+ "error",
+ "critical",
+ "sample"),
+ help='Event type')
+ parser.add_argument('--driver',
+ default="messagingv2",
+ choices=extension.ExtensionManager(
+ 'oslo.messaging.notify.drivers'
+ ).names(),
+ help='Notification driver')
+ parser.add_argument('payload')
+ args = parser.parse_args()
+ conf = cfg.ConfigOpts()
+ conf([],
+ default_config_files=[args.config_file] if args.config_file else None)
+ transport = get_notification_transport(conf, url=args.transport_url)
+ notifier = Notifier(transport, args.publisher_id, topics=args.topic,
+ driver=args.driver)
+ notifier._notify({}, args.event_type, args.payload, args.priority)
+
+
@six.add_metaclass(abc.ABCMeta)
class Driver(object):
"""Base driver for Notifications"""
@@ -171,7 +216,7 @@ class Notifier(object):
N means N retries
:type retry: int
:param topics: the topics which to send messages on
- :type topic: list of strings
+ :type topics: list of strings
"""
conf = transport.conf
conf.register_opts(_notifier_opts,
diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py
index b04768a..c252496 100644
--- a/oslo_messaging/opts.py
+++ b/oslo_messaging/opts.py
@@ -25,7 +25,7 @@ from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_pika
from oslo_messaging._drivers import impl_rabbit
-from oslo_messaging._drivers import impl_zmq
+from oslo_messaging._drivers.impl_zmq import zmq_options
from oslo_messaging._drivers.pika_driver import pika_connection_factory
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
from oslo_messaging.notify import notifier
@@ -36,7 +36,7 @@ from oslo_messaging import transport
_global_opt_lists = [
drivers_base.base_opts,
- impl_zmq.zmq_opts,
+ zmq_options.zmq_opts,
server._pool_opts,
client._client_opts,
transport._transport_opts,
@@ -45,6 +45,7 @@ _global_opt_lists = [
_opts = [
(None, list(itertools.chain(*_global_opt_lists))),
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
+ ('oslo_messaging_zmq', zmq_options.zmq_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(
diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py
index aece3ec..5d29c8a 100644
--- a/oslo_messaging/tests/drivers/pika/test_message.py
+++ b/oslo_messaging/tests/drivers/pika/test_message.py
@@ -49,10 +49,10 @@ class PikaIncomingMessageTestCase(unittest.TestCase):
self._body
)
- self.assertEqual(message.ctxt.get("key_context", None),
- "context_value")
- self.assertEqual(message.message.get("payload_key", None),
- "payload_value")
+ self.assertEqual("context_value",
+ message.ctxt.get("key_context", None))
+ self.assertEqual("payload_value",
+ message.message.get("payload_key", None))
def test_message_acknowledge(self):
message = pika_drv_msg.PikaIncomingMessage(
@@ -129,13 +129,13 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
self._body
)
- self.assertEqual(message.ctxt.get("key_context", None),
- "context_value")
- self.assertEqual(message.msg_id, 123456789)
- self.assertEqual(message.reply_q, "reply_queue")
+ self.assertEqual("context_value",
+ message.ctxt.get("key_context", None))
+ self.assertEqual(123456789, message.msg_id)
+ self.assertEqual("reply_queue", message.reply_q)
- self.assertEqual(message.message.get("payload_key", None),
- "payload_value")
+ self.assertEqual("payload_value",
+ message.message.get("payload_key", None))
def test_cast_message_body_parsing(self):
message = pika_drv_msg.RpcPikaIncomingMessage(
@@ -143,13 +143,13 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
self._body
)
- self.assertEqual(message.ctxt.get("key_context", None),
- "context_value")
- self.assertEqual(message.msg_id, None)
- self.assertEqual(message.reply_q, None)
+ self.assertEqual("context_value",
+ message.ctxt.get("key_context", None))
+ self.assertEqual(None, message.msg_id)
+ self.assertEqual(None, message.reply_q)
- self.assertEqual(message.message.get("payload_key", None),
- "payload_value")
+ self.assertEqual("payload_value",
+ message.message.get("payload_key", None))
@patch(("oslo_messaging._drivers.pika_driver.pika_message."
"PikaOutgoingMessage.send"))
@@ -159,17 +159,17 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
self._body
)
- self.assertEqual(message.ctxt.get("key_context", None),
- "context_value")
- self.assertEqual(message.msg_id, None)
- self.assertEqual(message.reply_q, None)
+ self.assertEqual("context_value",
+ message.ctxt.get("key_context", None))
+ self.assertEqual(None, message.msg_id)
+ self.assertEqual(None, message.reply_q)
- self.assertEqual(message.message.get("payload_key", None),
- "payload_value")
+ self.assertEqual("payload_value",
+ message.message.get("payload_key", None))
message.reply(reply=object())
- self.assertEqual(send_reply_mock.call_count, 0)
+ self.assertEqual(0, send_reply_mock.call_count)
@patch("oslo_messaging._drivers.pika_driver.pika_message."
"RpcReplyPikaOutgoingMessage")
@@ -185,13 +185,13 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
self._body
)
- self.assertEqual(message.ctxt.get("key_context", None),
- "context_value")
- self.assertEqual(message.msg_id, 123456789)
- self.assertEqual(message.reply_q, "reply_queue")
+ self.assertEqual("context_value",
+ message.ctxt.get("key_context", None))
+ self.assertEqual(123456789, message.msg_id)
+ self.assertEqual("reply_queue", message.reply_q)
- self.assertEqual(message.message.get("payload_key", None),
- "payload_value")
+ self.assertEqual("payload_value",
+ message.message.get("payload_key", None))
reply = "all_fine"
message.reply(reply=reply)
@@ -221,13 +221,13 @@ class RpcPikaIncomingMessageTestCase(unittest.TestCase):
self._body
)
- self.assertEqual(message.ctxt.get("key_context", None),
- "context_value")
- self.assertEqual(message.msg_id, 123456789)
- self.assertEqual(message.reply_q, "reply_queue")
+ self.assertEqual("context_value",
+ message.ctxt.get("key_context", None))
+ self.assertEqual(123456789, message.msg_id)
+ self.assertEqual("reply_queue", message.reply_q)
- self.assertEqual(message.message.get("payload_key", None),
- "payload_value")
+ self.assertEqual("payload_value",
+ message.message.get("payload_key", None))
failure_info = object()
message.reply(failure=failure_info)
@@ -277,9 +277,9 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
body
)
- self.assertEqual(message.msg_id, 123456789)
+ self.assertEqual(123456789, message.msg_id)
self.assertIsNone(message.failure)
- self.assertEqual(message.result, "all fine")
+ self.assertEqual("all fine", message.result)
def test_negative_reply_message_body_parsing(self):
@@ -297,12 +297,12 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
body
)
- self.assertEqual(message.msg_id, 123456789)
+ self.assertEqual(123456789, message.msg_id)
self.assertIsNone(message.result)
self.assertEqual(
- str(message.failure),
'Error message\n'
- 'TRACE HERE'
+ 'TRACE HERE',
+ str(message.failure)
)
self.assertIsInstance(message.failure,
oslo_messaging.MessagingException)
@@ -359,12 +359,12 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
- self.assertEqual(props.content_encoding, 'utf-8')
- self.assertEqual(props.content_type, 'application/json')
- self.assertEqual(props.delivery_mode, 2)
+ self.assertEqual('utf-8', props.content_encoding)
+ self.assertEqual('application/json', props.content_type)
+ self.assertEqual(2, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
100)
- self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertEqual({'version': '1.0'}, props.headers)
self.assertTrue(props.message_id)
@patch("oslo_serialization.jsonutils.dumps",
@@ -404,12 +404,12 @@ class PikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_without_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
- self.assertEqual(props.content_encoding, 'utf-8')
- self.assertEqual(props.content_type, 'application/json')
- self.assertEqual(props.delivery_mode, 1)
+ self.assertEqual('utf-8', props.content_encoding)
+ self.assertEqual('application/json', props.content_type)
+ self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration)
< 100)
- self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertEqual({'version': '1.0'}, props.headers)
self.assertTrue(props.message_id)
@@ -463,11 +463,11 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
- self.assertEqual(props.content_encoding, 'utf-8')
- self.assertEqual(props.content_type, 'application/json')
- self.assertEqual(props.delivery_mode, 1)
+ self.assertEqual('utf-8', props.content_encoding)
+ self.assertEqual('application/json', props.content_type)
+ self.assertEqual(1, props.delivery_mode)
self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
- self.assertEqual(props.headers, {'version': '1.0'})
+ self.assertEqual({'version': '1.0'}, props.headers)
self.assertIsNone(props.correlation_id)
self.assertIsNone(props.reply_to)
self.assertTrue(props.message_id)
@@ -521,13 +521,13 @@ class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
- self.assertEqual(props.content_encoding, 'utf-8')
- self.assertEqual(props.content_type, 'application/json')
- self.assertEqual(props.delivery_mode, 1)
+ self.assertEqual('utf-8', props.content_encoding)
+ self.assertEqual('application/json', props.content_type)
+ self.assertEqual(1, props.delivery_mode)
self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
- self.assertEqual(props.headers, {'version': '1.0'})
- self.assertEqual(props.correlation_id, message.msg_id)
- self.assertEqual(props.reply_to, reply_queue_name)
+ self.assertEqual({'version': '1.0'}, props.headers)
+ self.assertEqual(message.msg_id, props.correlation_id)
+ self.assertEqual(reply_queue_name, props.reply_to)
self.assertTrue(props.message_id)
@@ -567,13 +567,13 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
- self.assertEqual(props.content_encoding, 'utf-8')
- self.assertEqual(props.content_type, 'application/json')
- self.assertEqual(props.delivery_mode, 1)
+ self.assertEqual('utf-8', props.content_encoding)
+ self.assertEqual('application/json', props.content_type)
+ self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
100)
- self.assertEqual(props.headers, {'version': '1.0'})
- self.assertEqual(props.correlation_id, message.msg_id)
+ self.assertEqual({'version': '1.0'}, props.headers)
+ self.assertEqual(message.msg_id, props.correlation_id)
self.assertIsNone(props.reply_to)
self.assertTrue(props.message_id)
@@ -612,12 +612,12 @@ class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
- self.assertEqual(props.content_encoding, 'utf-8')
- self.assertEqual(props.content_type, 'application/json')
- self.assertEqual(props.delivery_mode, 1)
+ self.assertEqual('utf-8', props.content_encoding)
+ self.assertEqual('application/json', props.content_type)
+ self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
100)
- self.assertEqual(props.headers, {'version': '1.0'})
- self.assertEqual(props.correlation_id, message.msg_id)
+ self.assertEqual({'version': '1.0'}, props.headers)
+ self.assertEqual(message.msg_id, props.correlation_id)
self.assertIsNone(props.reply_to)
self.assertTrue(props.message_id)
diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py
index a481576..445b338 100644
--- a/oslo_messaging/tests/drivers/pika/test_poller.py
+++ b/oslo_messaging/tests/drivers/pika/test_poller.py
@@ -98,9 +98,9 @@ class PikaPollerTestCase(unittest.TestCase):
unused, method, properties, body
)
- self.assertEqual(len(res), 1)
+ self.assertEqual(1, len(res))
- self.assertEqual(res[0], [incoming_message_class_mock.return_value])
+ self.assertEqual([incoming_message_class_mock.return_value], res[0])
incoming_message_class_mock.assert_called_once_with(
self._pika_engine, self._poller_channel_mock, method, properties,
body
@@ -139,16 +139,16 @@ class PikaPollerTestCase(unittest.TestCase):
*params[i]
)
- self.assertEqual(len(res), 1)
- self.assertEqual(len(res[0]), 10)
- self.assertEqual(incoming_message_class_mock.call_count, n)
+ self.assertEqual(1, len(res))
+ self.assertEqual(10, len(res[0]))
+ self.assertEqual(n, incoming_message_class_mock.call_count)
for i in range(n):
- self.assertEqual(res[0][i],
- incoming_message_class_mock.return_value)
+ self.assertEqual(incoming_message_class_mock.return_value,
+ res[0][i])
self.assertEqual(
- incoming_message_class_mock.call_args_list[i][0],
- (self._pika_engine, self._poller_channel_mock) + params[i][1:]
+ (self._pika_engine, self._poller_channel_mock) + params[i][1:],
+ incoming_message_class_mock.call_args_list[i][0]
)
self.assertTrue(self._pika_engine.create_connection.called)
@@ -193,16 +193,16 @@ class PikaPollerTestCase(unittest.TestCase):
self.assertTrue(evt.wait(timeout * 2))
- self.assertEqual(len(res), 1)
- self.assertEqual(len(res[0]), success_count)
- self.assertEqual(incoming_message_class_mock.call_count, success_count)
+ self.assertEqual(1, len(res))
+ self.assertEqual(success_count, len(res[0]))
+ self.assertEqual(success_count, incoming_message_class_mock.call_count)
for i in range(success_count):
- self.assertEqual(res[0][i],
- incoming_message_class_mock.return_value)
+ self.assertEqual(incoming_message_class_mock.return_value,
+ res[0][i])
self.assertEqual(
- incoming_message_class_mock.call_args_list[i][0],
- (self._pika_engine, self._poller_channel_mock) + params[i][1:]
+ (self._pika_engine, self._poller_channel_mock) + params[i][1:],
+ incoming_message_class_mock.call_args_list[i][0]
)
self.assertTrue(self._pika_engine.create_connection.called)
@@ -255,7 +255,7 @@ class RpcServicePikaPollerTestCase(unittest.TestCase):
)
self.assertEqual(
- declare_queue_binding_by_channel_mock.call_count, 6
+ 6, declare_queue_binding_by_channel_mock.call_count
)
declare_queue_binding_by_channel_mock.assert_has_calls((
@@ -345,7 +345,7 @@ class RpcReplyServicePikaPollerTestCase(unittest.TestCase):
)
self.assertEqual(
- declare_queue_binding_by_channel_mock.call_count, 1
+ 1, declare_queue_binding_by_channel_mock.call_count
)
declare_queue_binding_by_channel_mock.assert_called_once_with(
@@ -399,7 +399,7 @@ class NotificationPikaPollerTestCase(unittest.TestCase):
)
self.assertEqual(
- declare_queue_binding_by_channel_mock.call_count, 3
+ 3, declare_queue_binding_by_channel_mock.call_count
)
declare_queue_binding_by_channel_mock.assert_has_calls((
@@ -448,7 +448,7 @@ class NotificationPikaPollerTestCase(unittest.TestCase):
)
self.assertEqual(
- declare_queue_binding_by_channel_mock.call_count, 3
+ 3, declare_queue_binding_by_channel_mock.call_count
)
declare_queue_binding_by_channel_mock.assert_has_calls((
diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py
index 396983e..0d3bbb6 100644
--- a/oslo_messaging/tests/drivers/test_amqp_driver.py
+++ b/oslo_messaging/tests/drivers/test_amqp_driver.py
@@ -19,7 +19,6 @@ import shutil
import socket
import subprocess
import sys
-import tempfile
import threading
import time
import uuid
@@ -172,7 +171,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
self.assertIsNone(rc)
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
- self.assertEqual(listener.messages.get().message, {"msg": "value"})
+ self.assertEqual({"msg": "value"}, listener.messages.get().message)
predicate = lambda: (self._broker.sender_link_ack_count == 1)
_wait_until(predicate, 30)
@@ -194,14 +193,14 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
wait_for_reply=True,
timeout=30)
self.assertIsNotNone(rc)
- self.assertEqual(rc.get('correlation-id'), 'e1')
+ self.assertEqual('e1', rc.get('correlation-id'))
rc = driver.send(target2, {"context": "whatever"},
{"method": "echo", "id": "e2"},
wait_for_reply=True,
timeout=30)
self.assertIsNotNone(rc)
- self.assertEqual(rc.get('correlation-id'), 'e2')
+ self.assertEqual('e2', rc.get('correlation-id'))
listener1.join(timeout=30)
self.assertFalse(listener1.isAlive())
@@ -226,15 +225,15 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
driver.send(shared_target, {"context": "whatever"},
{"method": "echo", "id": "either-1"},
wait_for_reply=True)
- self.assertEqual(self._broker.topic_count, 1)
- self.assertEqual(self._broker.direct_count, 1) # reply
+ self.assertEqual(1, self._broker.topic_count)
+ self.assertEqual(1, self._broker.direct_count) # reply
# this should go to the other server:
driver.send(shared_target, {"context": "whatever"},
{"method": "echo", "id": "either-2"},
wait_for_reply=True)
- self.assertEqual(self._broker.topic_count, 2)
- self.assertEqual(self._broker.direct_count, 2) # reply
+ self.assertEqual(2, self._broker.topic_count)
+ self.assertEqual(2, self._broker.direct_count) # reply
# these should only go to listener1:
driver.send(target1, {"context": "whatever"},
@@ -244,13 +243,13 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
driver.send(target1, {"context": "whatever"},
{"method": "echo", "id": "server1-2"},
wait_for_reply=True)
- self.assertEqual(self._broker.direct_count, 6) # 2X(send+reply)
+ self.assertEqual(6, self._broker.direct_count) # 2X(send+reply)
# this should only go to listener2:
driver.send(target2, {"context": "whatever"},
{"method": "echo", "id": "server2"},
wait_for_reply=True)
- self.assertEqual(self._broker.direct_count, 8)
+ self.assertEqual(8, self._broker.direct_count)
# both listeners should get a copy:
driver.send(fanout_target, {"context": "whatever"},
@@ -260,7 +259,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
self.assertFalse(listener1.isAlive())
listener2.join(timeout=30)
self.assertFalse(listener2.isAlive())
- self.assertEqual(self._broker.fanout_count, 1)
+ self.assertEqual(1, self._broker.fanout_count)
listener1_ids = [x.message.get('id') for x in listener1.get_messages()]
listener2_ids = [x.message.get('id') for x in listener2.get_messages()]
@@ -521,13 +520,13 @@ class TestAmqpNotification(_AmqpBrokerTestCaseAuto):
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
topics = [x.message.get('target') for x in listener.get_messages()]
- self.assertEqual(len(topics), msg_count)
- self.assertEqual(topics.count('topic-1.info'), 2)
- self.assertEqual(topics.count('topic-1.error'), 2)
- self.assertEqual(topics.count('topic-2.debug'), 2)
- self.assertEqual(self._broker.dropped_count, 4)
- self.assertEqual(excepted_targets.count('topic-1.bad'), 2)
- self.assertEqual(excepted_targets.count('bad-topic.debug'), 2)
+ self.assertEqual(msg_count, len(topics))
+ self.assertEqual(2, topics.count('topic-1.info'))
+ self.assertEqual(2, topics.count('topic-1.error'))
+ self.assertEqual(2, topics.count('topic-2.debug'))
+ self.assertEqual(4, self._broker.dropped_count)
+ self.assertEqual(2, excepted_targets.count('topic-1.bad'))
+ self.assertEqual(2, excepted_targets.count('bad-topic.debug'))
driver.cleanup()
def test_released_notification(self):
@@ -613,59 +612,72 @@ class TestAuthentication(test_utils.BaseTestCase):
class TestCyrusAuthentication(test_utils.BaseTestCase):
"""Test the driver's Cyrus SASL integration"""
- def setUp(self):
- """Create a simple SASL configuration. This assumes saslpasswd2 is in
- the OS path, otherwise the test will be skipped.
- """
- super(TestCyrusAuthentication, self).setUp()
+ _conf_dir = None
+
+ # Note: don't add ANONYMOUS or EXTERNAL mechs without updating the
+ # test_authentication_bad_mechs test below
+ _mechs = "DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN"
+
+ @classmethod
+ def setUpClass(cls):
+ # The Cyrus library can only be initialized once per _process_
# Create a SASL configuration and user database,
# add a user 'joe' with password 'secret':
- self._conf_dir = tempfile.mkdtemp()
- db = os.path.join(self._conf_dir, 'openstack.sasldb')
+ cls._conf_dir = "/tmp/amqp1_tests_%s" % os.getpid()
+ # no, we cannot use tempfile.mkdtemp() as it will 'helpfully' remove
+ # the temp dir after the first test is run
+ os.makedirs(cls._conf_dir)
+ db = os.path.join(cls._conf_dir, 'openstack.sasldb')
_t = "echo secret | saslpasswd2 -c -p -f ${db} joe"
cmd = Template(_t).substitute(db=db)
try:
subprocess.check_call(args=cmd, shell=True)
except Exception:
- shutil.rmtree(self._conf_dir, ignore_errors=True)
- self._conf_dir = None
- raise self.skip("Cyrus tool saslpasswd2 not installed")
-
- # configure the SASL broker:
- conf = os.path.join(self._conf_dir, 'openstack.conf')
- # Note: don't add ANONYMOUS or EXTERNAL without updating the
- # test_authentication_bad_mechs test below
- mechs = "DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN"
+ shutil.rmtree(cls._conf_dir, ignore_errors=True)
+ cls._conf_dir = None
+ return
+
+ # configure the SASL server:
+ conf = os.path.join(cls._conf_dir, 'openstack.conf')
t = Template("""sasldb_path: ${db}
pwcheck_method: auxprop
auxprop_plugin: sasldb
mech_list: ${mechs}
""")
with open(conf, 'w') as f:
- f.write(t.substitute(db=db, mechs=mechs))
+ f.write(t.substitute(db=db, mechs=cls._mechs))
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls._conf_dir:
+ shutil.rmtree(cls._conf_dir, ignore_errors=True)
+ def setUp(self):
+ # fire up a test broker with the SASL config:
+ super(TestCyrusAuthentication, self).setUp()
+ if TestCyrusAuthentication._conf_dir is None:
+ self.skipTest("Cyrus SASL tools not installed")
+ _mechs = TestCyrusAuthentication._mechs
+ _dir = TestCyrusAuthentication._conf_dir
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
- sasl_mechanisms=mechs,
+ sasl_mechanisms=_mechs,
user_credentials=["\0joe\0secret"],
- sasl_config_dir=self._conf_dir,
+ sasl_config_dir=_dir,
sasl_config_name="openstack")
self._broker.start()
self.messaging_conf.transport_driver = 'amqp'
self.conf = self.messaging_conf.conf
def tearDown(self):
- super(TestCyrusAuthentication, self).tearDown()
if self._broker:
self._broker.stop()
self._broker = None
- if self._conf_dir:
- shutil.rmtree(self._conf_dir, ignore_errors=True)
+ super(TestCyrusAuthentication, self).tearDown()
def test_authentication_ok(self):
"""Verify that username and password given in TransportHost are
accepted by the broker.
"""
-
addr = "amqp://joe:secret@%s:%d" % (self._broker.host,
self._broker.port)
url = oslo_messaging.TransportURL.parse(self.conf, addr)
@@ -797,11 +809,11 @@ class TestFailover(test_utils.BaseTestCase):
wait_for_reply=True,
timeout=30)
self.assertIsNotNone(rc)
- self.assertEqual(rc.get('correlation-id'), 'echo-1')
+ self.assertEqual('echo-1', rc.get('correlation-id'))
# 1 request msg, 1 response:
- self.assertEqual(self._brokers[self._primary].topic_count, 1)
- self.assertEqual(self._brokers[self._primary].direct_count, 1)
+ self.assertEqual(1, self._brokers[self._primary].topic_count)
+ self.assertEqual(1, self._brokers[self._primary].direct_count)
# invoke failover method
fail_broker(self._brokers[self._primary])
@@ -818,11 +830,11 @@ class TestFailover(test_utils.BaseTestCase):
wait_for_reply=True,
timeout=2)
self.assertIsNotNone(rc)
- self.assertEqual(rc.get('correlation-id'), 'echo-2')
+ self.assertEqual('echo-2', rc.get('correlation-id'))
# 1 request msg, 1 response:
- self.assertEqual(self._brokers[self._backup].topic_count, 1)
- self.assertEqual(self._brokers[self._backup].direct_count, 1)
+ self.assertEqual(1, self._brokers[self._backup].topic_count)
+ self.assertEqual(1, self._brokers[self._backup].direct_count)
listener.join(timeout=30)
self.assertFalse(listener.isAlive())
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index ac8e9cb..f3ddef6 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -189,16 +189,16 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
'kombu+memory:////')
self.addCleanup(transport.cleanup)
- transport._driver._get_connection()
+ connection = transport._driver._get_connection()
connection_klass.assert_called_once_with(
'memory:///', transport_options={
'client_properties': {
'capabilities': {
'connection.blocked': True,
'consumer_cancel_notify': True,
- 'authentication_failure_close': True
- }
- },
+ 'authentication_failure_close': True,
+ },
+ 'connection_name': connection.name},
'confirm_publish': True,
'on_blocked': mock.ANY,
'on_unblocked': mock.ANY},
@@ -654,7 +654,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
# Verify the _send_reply was not invoked by driver:
with mock.patch.object(msgs[2], '_send_reply') as method:
msgs[2].reply({'rx_id': 2})
- self.assertEqual(method.call_count, 0)
+ self.assertEqual(0, method.call_count)
# Wait for the 3rd thread to finish
senders[2].join()
diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
index 94d64b4..2e369f7 100644
--- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
+++ b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py
@@ -66,8 +66,8 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
def test_register(self):
self.test_matcher.register(self.target, self.host1, "test")
- self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
- [self.host1])
+ self.assertEqual([self.host1],
+ self.test_matcher.get_hosts(self.target, "test"))
def test_register_two_hosts(self):
self.test_matcher.register(self.target, self.host1, "test")
@@ -89,8 +89,8 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.test_matcher.register(self.target, self.host1, "test")
self.test_matcher.register(self.target, self.host1, "test")
- self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
- [self.host1])
+ self.assertEqual([self.host1],
+ self.test_matcher.get_hosts(self.target, "test"))
def test_get_hosts_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
@@ -99,4 +99,4 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
hosts = self.test_matcher.get_hosts(target, "test")
except (timeout.TimeoutException, retrying.RetryError):
pass
- self.assertEqual(hosts, [])
+ self.assertEqual([], hosts)
diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
index f9f6f52..04d86d9 100644
--- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
@@ -35,7 +35,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
# Set config values
kwargs = {'rpc_zmq_min_port': 5555,
'rpc_zmq_max_port': 5560}
- self.config(**kwargs)
+ self.config(group='oslo_messaging_zmq', **kwargs)
def test_ports_range(self):
listeners = []
@@ -45,7 +45,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
target = oslo_messaging.Target(topic='testtopic_' + str(i))
new_listener = self.driver.listen(target, None, None)
listeners.append(new_listener)
- except zmq_socket.ZmqPortRangeExceededException:
+ except zmq_socket.ZmqPortBusy:
pass
self.assertLessEqual(len(listeners), 5)
diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
index 0287ccf..81a708c 100644
--- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
+++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
@@ -12,32 +12,54 @@
# License for the specific language governing permissions and limitations
# under the License.
-import pickle
+import json
import time
+import msgpack
+import six
+import testscenarios
+
+from oslo_config import cfg
+
import oslo_messaging
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_pub_publisher
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging.tests.drivers.zmq import zmq_common
+load_tests = testscenarios.load_tests_apply_scenarios
zmq = zmq_async.import_zmq()
+opt_group = cfg.OptGroup(name='zmq_proxy_opts',
+ title='ZeroMQ proxy options')
+cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
+
class TestPubSub(zmq_common.ZmqBaseTestCase):
LISTENERS_COUNT = 3
+ scenarios = [
+ ('json', {'serialization': 'json',
+ 'dumps': lambda obj: six.b(json.dumps(obj))}),
+ ('msgpack', {'serialization': 'msgpack',
+ 'dumps': msgpack.dumps})
+ ]
+
def setUp(self):
super(TestPubSub, self).setUp()
- kwargs = {'use_pub_sub': True}
- self.config(**kwargs)
+ kwargs = {'use_pub_sub': True,
+ 'rpc_zmq_serialization': self.serialization}
+ self.config(group='oslo_messaging_zmq', **kwargs)
- self.publisher = zmq_pub_publisher.PubPublisherProxy(
+ self.config(host="127.0.0.1", group="zmq_proxy_opts")
+ self.config(publisher_port="0", group="zmq_proxy_opts")
+
+ self.publisher = zmq_publisher_proxy.PublisherProxy(
self.conf, self.driver.matchmaker)
self.driver.matchmaker.register_publisher(
(self.publisher.host, ""))
@@ -46,6 +68,12 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
for i in range(self.LISTENERS_COUNT):
self.listeners.append(zmq_common.TestServerListener(self.driver))
+ def tearDown(self):
+ super(TestPubSub, self).tearDown()
+ self.publisher.cleanup()
+ for listener in self.listeners:
+ listener.stop()
+
def _send_request(self, target):
# Needed only in test env to give listener a chance to connect
# before request fires
@@ -58,8 +86,7 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
zmq_address.target_to_subscribe_filter(target),
b"message",
b"0000-0000",
- pickle.dumps(context),
- pickle.dumps(message)])
+ self.dumps([context, message])])
def _check_listener(self, listener):
listener._received.wait(timeout=5)
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
new file mode 100644
index 0000000..05a2301
--- /dev/null
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py
@@ -0,0 +1,185 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import mock
+import testtools
+
+import oslo_messaging
+from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
+from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
+from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_options
+from oslo_messaging.tests.drivers.zmq import zmq_common
+from oslo_messaging.tests import utils as test_utils
+
+zmq = zmq_async.import_zmq()
+
+
+class TestZmqAckManager(test_utils.BaseTestCase):
+
+ @testtools.skipIf(zmq is None, "zmq not available")
+ def setUp(self):
+ super(TestZmqAckManager, self).setUp()
+ self.messaging_conf.transport_driver = 'zmq'
+ zmq_options.register_opts(self.conf)
+
+ # set config opts
+ kwargs = {'rpc_zmq_matchmaker': 'dummy',
+ 'use_pub_sub': False,
+ 'use_router_proxy': True,
+ 'rpc_thread_pool_size': 1,
+ 'rpc_use_acks': True,
+ 'rpc_ack_timeout_base': 3,
+ 'rpc_ack_timeout_multiplier': 1,
+ 'rpc_retry_attempts': 2}
+ self.config(group='oslo_messaging_zmq', **kwargs)
+ self.conf.register_opts(zmq_proxy.zmq_proxy_opts,
+ group='zmq_proxy_opts')
+
+ # mock set_result method of futures
+ self.set_result_patcher = mock.patch.object(
+ zmq_receivers.futurist.Future, 'set_result',
+ side_effect=zmq_receivers.futurist.Future.set_result, autospec=True
+ )
+ self.set_result = self.set_result_patcher.start()
+
+ # mock send method of senders
+ self.send_patcher = mock.patch.object(
+ zmq_senders.RequestSenderProxy, 'send',
+ side_effect=zmq_senders.RequestSenderProxy.send, autospec=True
+ )
+ self.send = self.send_patcher.start()
+
+ # get driver
+ transport = oslo_messaging.get_transport(self.conf)
+ self.driver = transport._driver
+
+ # get ack manager
+ self.ack_manager = self.driver.client.get().publishers['default']
+
+ # prepare and launch proxy
+ self.proxy = zmq_proxy.ZmqProxy(self.conf,
+ zmq_queue_proxy.UniversalQueueProxy)
+ vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker))
+ self.executor = zmq_async.get_executor(self.proxy.run)
+ self.executor.execute()
+
+ # create listener
+ self.listener = zmq_common.TestServerListener(self.driver)
+
+ # create target and message
+ self.target = oslo_messaging.Target(topic='topic', server='server')
+ self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}}
+
+ self.addCleanup(
+ zmq_common.StopRpc(
+ self, [('listener', 'stop'), ('executor', 'stop'),
+ ('proxy', 'close'), ('driver', 'cleanup'),
+ ('send_patcher', 'stop'),
+ ('set_result_patcher', 'stop')]
+ )
+ )
+
+ @mock.patch.object(
+ zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
+ side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
+ autospec=True
+ )
+ def test_cast_success_without_retries(self, received_ack_mock):
+ self.listener.listen(self.target)
+ result = self.driver.send(
+ self.target, {}, self.message, wait_for_reply=False
+ )
+ self.ack_manager._pool.shutdown(wait=True)
+ self.assertIsNone(result)
+ self.assertTrue(self.listener._received.isSet())
+ self.assertEqual(self.message, self.listener.message.message)
+ self.assertEqual(1, self.send.call_count)
+ self.assertEqual(1, received_ack_mock.call_count)
+ self.assertEqual(2, self.set_result.call_count)
+
+ def test_cast_success_with_one_retry(self):
+ self.listener.listen(self.target)
+ with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
+ 'acknowledge') as lost_ack_mock:
+ result = self.driver.send(
+ self.target, {}, self.message, wait_for_reply=False
+ )
+ self.listener._received.wait(3)
+ self.assertIsNone(result)
+ self.assertTrue(self.listener._received.isSet())
+ self.assertEqual(self.message, self.listener.message.message)
+ self.assertEqual(1, self.send.call_count)
+ self.assertEqual(1, lost_ack_mock.call_count)
+ self.assertEqual(0, self.set_result.call_count)
+ with mock.patch.object(
+ zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
+ side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
+ autospec=True
+ ) as received_ack_mock:
+ self.listener._received.clear()
+ self.ack_manager._pool.shutdown(wait=True)
+ self.listener._received.wait(3)
+ self.assertFalse(self.listener._received.isSet())
+ self.assertEqual(2, self.send.call_count)
+ self.assertEqual(1, received_ack_mock.call_count)
+ self.assertEqual(2, self.set_result.call_count)
+
+ def test_cast_success_with_two_retries(self):
+ self.listener.listen(self.target)
+ with mock.patch.object(zmq_incoming_message.ZmqIncomingMessage,
+ 'acknowledge') as lost_ack_mock:
+ result = self.driver.send(
+ self.target, {}, self.message, wait_for_reply=False
+ )
+ self.listener._received.wait(3)
+ self.assertIsNone(result)
+ self.assertTrue(self.listener._received.isSet())
+ self.assertEqual(self.message, self.listener.message.message)
+ self.assertEqual(1, self.send.call_count)
+ self.assertEqual(1, lost_ack_mock.call_count)
+ self.assertEqual(0, self.set_result.call_count)
+ self.listener._received.clear()
+ self.listener._received.wait(4.5)
+ self.assertFalse(self.listener._received.isSet())
+ self.assertEqual(2, self.send.call_count)
+ self.assertEqual(2, lost_ack_mock.call_count)
+ self.assertEqual(0, self.set_result.call_count)
+ with mock.patch.object(
+ zmq_incoming_message.ZmqIncomingMessage, 'acknowledge',
+ side_effect=zmq_incoming_message.ZmqIncomingMessage.acknowledge,
+ autospec=True
+ ) as received_ack_mock:
+ self.ack_manager._pool.shutdown(wait=True)
+ self.assertFalse(self.listener._received.isSet())
+ self.assertEqual(3, self.send.call_count)
+ self.assertEqual(1, received_ack_mock.call_count)
+ self.assertEqual(2, self.set_result.call_count)
+
+ @mock.patch.object(zmq_incoming_message.ZmqIncomingMessage, 'acknowledge')
+ def test_cast_failure_exhausted_retries(self, lost_ack_mock):
+ self.listener.listen(self.target)
+ result = self.driver.send(
+ self.target, {}, self.message, wait_for_reply=False
+ )
+ self.ack_manager._pool.shutdown(wait=True)
+ self.assertIsNone(result)
+ self.assertTrue(self.listener._received.isSet())
+ self.assertEqual(self.message, self.listener.message.message)
+ self.assertEqual(3, self.send.call_count)
+ self.assertEqual(3, lost_ack_mock.call_count)
+ self.assertEqual(1, self.set_result.call_count)
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py
index a2caf12..702e39b 100644
--- a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py
@@ -57,37 +57,16 @@ class TestGetPoller(test_utils.BaseTestCase):
def test_when_eventlet_is_available_then_return_GreenPoller(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
- actual = zmq_async.get_poller()
+ poller = zmq_async.get_poller()
- self.assertTrue(isinstance(actual, green_poller.GreenPoller))
+ self.assertTrue(isinstance(poller, green_poller.GreenPoller))
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: False
- actual = zmq_async.get_poller()
+ poller = zmq_async.get_poller()
- self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
-
-
-class TestGetReplyPoller(test_utils.BaseTestCase):
-
- @testtools.skipIf(zmq is None, "zmq not available")
- def setUp(self):
- super(TestGetReplyPoller, self).setUp()
-
- def test_when_eventlet_is_available_then_return_HoldReplyPoller(self):
- zmq_async.eventletutils.is_monkey_patched = lambda _: True
-
- actual = zmq_async.get_poller()
-
- self.assertTrue(isinstance(actual, green_poller.GreenPoller))
-
- def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
- zmq_async.eventletutils.is_monkey_patched = lambda _: False
-
- actual = zmq_async.get_poller()
-
- self.assertTrue(isinstance(actual, threading_poller.ThreadingPoller))
+ self.assertTrue(isinstance(poller, threading_poller.ThreadingPoller))
class TestGetExecutor(test_utils.BaseTestCase):
diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py
new file mode 100644
index 0000000..fa2e240
--- /dev/null
+++ b/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py
@@ -0,0 +1,116 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
+from oslo_messaging.tests import utils as test_utils
+
+
+class TestZmqTTLCache(test_utils.BaseTestCase):
+
+ def setUp(self):
+ super(TestZmqTTLCache, self).setUp()
+
+ def call_count_decorator(unbound_method):
+ def wrapper(self, *args, **kwargs):
+ wrapper.call_count += 1
+ return unbound_method(self, *args, **kwargs)
+ wrapper.call_count = 0
+ return wrapper
+
+ zmq_ttl_cache.TTLCache._update_cache = \
+ call_count_decorator(zmq_ttl_cache.TTLCache._update_cache)
+
+ self.cache = zmq_ttl_cache.TTLCache(ttl=1)
+
+ def _test_in_operator(self):
+ self.cache.add(1)
+
+ self.assertTrue(1 in self.cache)
+
+ time.sleep(0.5)
+
+ self.cache.add(2)
+
+ self.assertTrue(1 in self.cache)
+ self.assertTrue(2 in self.cache)
+
+ time.sleep(0.75)
+
+ self.cache.add(3)
+
+ self.assertFalse(1 in self.cache)
+ self.assertTrue(2 in self.cache)
+ self.assertTrue(3 in self.cache)
+
+ time.sleep(0.5)
+
+ self.assertFalse(2 in self.cache)
+ self.assertTrue(3 in self.cache)
+
+ def test_in_operator_with_executor(self):
+ self._test_in_operator()
+
+ def test_in_operator_without_executor(self):
+ self.cache._executor.stop()
+ self._test_in_operator()
+
+ def _is_expired(self, item):
+ with self.cache._lock:
+ return self.cache._is_expired(self.cache._expiration_times[item],
+ time.time())
+
+ def test_executor(self):
+ self.cache.add(1)
+
+ self.assertEqual([1], sorted(self.cache._expiration_times.keys()))
+ self.assertFalse(self._is_expired(1))
+
+ time.sleep(0.75)
+
+ self.assertEqual(1, self.cache._update_cache.call_count)
+
+ self.cache.add(2)
+
+ self.assertEqual([1, 2], sorted(self.cache._expiration_times.keys()))
+ self.assertFalse(self._is_expired(1))
+ self.assertFalse(self._is_expired(2))
+
+ time.sleep(0.75)
+
+ self.assertEqual(2, self.cache._update_cache.call_count)
+
+ self.cache.add(3)
+
+ if 1 in self.cache:
+ self.assertEqual([1, 2, 3],
+ sorted(self.cache._expiration_times.keys()))
+ self.assertTrue(self._is_expired(1))
+ else:
+ self.assertEqual([2, 3],
+ sorted(self.cache._expiration_times.keys()))
+ self.assertFalse(self._is_expired(2))
+ self.assertFalse(self._is_expired(3))
+
+ time.sleep(0.75)
+
+ self.assertEqual(3, self.cache._update_cache.call_count)
+
+ self.assertEqual([3], sorted(self.cache._expiration_times.keys()))
+ self.assertFalse(self._is_expired(3))
+
+ def cleanUp(self):
+ self.cache.cleanup()
+ super(TestZmqTTLCache, self).cleanUp()
diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py
index f0ef4a4..86d15d1 100644
--- a/oslo_messaging/tests/drivers/zmq/zmq_common.py
+++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py
@@ -20,6 +20,7 @@ import testtools
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LE
from oslo_messaging.tests import utils as test_utils
@@ -71,17 +72,18 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
+ zmq_options.register_opts(self.conf)
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'use_pub_sub': False,
'use_router_proxy': False,
'rpc_zmq_matchmaker': 'dummy'}
- self.config(**kwargs)
+ self.config(group='oslo_messaging_zmq', **kwargs)
+ self.config(rpc_response_timeout=5)
# Get driver
transport = oslo_messaging.get_transport(self.conf)
@@ -89,15 +91,20 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
self.listener = TestServerListener(self.driver)
- self.addCleanup(StopRpc(self.__dict__))
+ self.addCleanup(
+ StopRpc(self, [('listener', 'stop'), ('driver', 'cleanup')])
+ )
class StopRpc(object):
- def __init__(self, attrs):
- self.attrs = attrs
+ def __init__(self, obj, attrs_and_stops):
+ self.obj = obj
+ self.attrs_and_stops = attrs_and_stops
def __call__(self):
- if self.attrs['driver']:
- self.attrs['driver'].cleanup()
- if self.attrs['listener']:
- self.attrs['listener'].stop()
+ for attr, stop in self.attrs_and_stops:
+ if hasattr(self.obj, attr):
+ obj_attr = getattr(self.obj, attr)
+ if hasattr(obj_attr, stop):
+ obj_attr_stop = getattr(obj_attr, stop)
+ obj_attr_stop()
diff --git a/oslo_messaging/tests/functional/notify/test_logger.py b/oslo_messaging/tests/functional/notify/test_logger.py
index a7f580b..4716776 100644
--- a/oslo_messaging/tests/functional/notify/test_logger.py
+++ b/oslo_messaging/tests/functional/notify/test_logger.py
@@ -68,12 +68,12 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL):
log_method('Test logging at priority: %s' % self.priority)
events = listener.get_events(timeout=1)
- self.assertEqual(len(events), 1)
+ self.assertEqual(1, len(events))
info_event = events[0]
- self.assertEqual(info_event[0], self.priority)
- self.assertEqual(info_event[1], 'logrecord')
+ self.assertEqual(self.priority, info_event[0])
+ self.assertEqual('logrecord', info_event[1])
for key in ['name', 'thread', 'extra', 'process', 'funcName',
'levelno', 'processName', 'pathname', 'lineno',
diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py
new file mode 100644
index 0000000..9a0427e
--- /dev/null
+++ b/oslo_messaging/tests/functional/test_rabbitmq.py
@@ -0,0 +1,147 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import os
+import signal
+import time
+
+import fixtures
+from pifpaf.drivers import rabbitmq
+
+from oslo_messaging.tests.functional import utils
+from oslo_messaging.tests import utils as test_utils
+
+
+class ConnectedPortMatcher(object):
+ def __init__(self, port):
+ self.port = port
+
+ def __eq__(self, data):
+ return data.get("port") == self.port
+
+ def __repr__(self):
+ return "<ConnectedPortMatcher port=%d>" % self.port
+
+
+class RabbitMQFailoverTests(test_utils.BaseTestCase):
+ DRIVERS = [
+ "rabbit",
+ ]
+
+ def test_failover_scenario(self):
+ # NOTE(sileht): run this test only if functional suite run of a driver
+ # that use rabbitmq as backend
+ self.driver = os.environ.get('TRANSPORT_DRIVER')
+ if self.driver not in self.DRIVERS:
+ self.skipTest("TRANSPORT_DRIVER is not set to a rabbit driver")
+
+ # NOTE(sileht): Allow only one response at a time, to
+ # have only one tcp connection for reply and ensure it will failover
+ # correctly
+ self.config(heartbeat_timeout_threshold=1,
+ rpc_conn_pool_size=1,
+ kombu_reconnect_delay=0,
+ rabbit_retry_interval=0,
+ rabbit_retry_backoff=0,
+ group='oslo_messaging_rabbit')
+
+ self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True,
+ port=5692))
+
+ self.url = self.pifpaf.env["PIFPAF_URL"]
+ self.n1 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME1"]
+ self.n2 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME2"]
+ self.n3 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME3"]
+
+ # NOTE(gdavoian): additional tweak for pika driver
+ if self.driver == "pika":
+ self.url = self.url.replace("rabbit", "pika")
+
+ # ensure connections will be establish to the first node
+ self.pifpaf.stop_node(self.n2)
+ self.pifpaf.stop_node(self.n3)
+
+ self.servers = self.useFixture(utils.RpcServerGroupFixture(
+ self.conf, self.url, endpoint=self, names=["server"]))
+
+ # Don't randomize rabbit hosts
+ self.useFixture(fixtures.MockPatch(
+ 'oslo_messaging._drivers.impl_rabbit.random',
+ side_effect=lambda x: x))
+
+ # NOTE(sileht): this connects server connections and reply
+ # connection to nodename n1
+ self.client = self.servers.client(0)
+ self.client.ping()
+ self._check_ports(self.pifpaf.port)
+
+ # Switch to node n2
+ self.pifpaf.start_node(self.n2)
+ self.assertEqual("callback done", self.client.kill_and_process())
+ self.assertEqual("callback done", self.client.just_process())
+ self._check_ports(self.pifpaf.get_port(self.n2))
+
+ # Switch to node n3
+ self.pifpaf.start_node(self.n3)
+ time.sleep(0.1)
+ self.pifpaf.kill_node(self.n2, signal=signal.SIGKILL)
+ time.sleep(0.1)
+ self.assertEqual("callback done", self.client.just_process())
+ self._check_ports(self.pifpaf.get_port(self.n3))
+
+ self.pifpaf.start_node(self.n1)
+ time.sleep(0.1)
+ self.pifpaf.kill_node(self.n3, signal=signal.SIGKILL)
+ time.sleep(0.1)
+ self.assertEqual("callback done", self.client.just_process())
+ self._check_ports(self.pifpaf.get_port(self.n1))
+
+ def kill_and_process(self, *args, **kargs):
+ self.pifpaf.kill_node(self.n1, signal=signal.SIGKILL)
+ time.sleep(0.1)
+ return "callback done"
+
+ def just_process(self, *args, **kargs):
+ return "callback done"
+
+ def _get_log_call_startswith(self, filter):
+ return [call for call in self.logger.debug.mock_calls
+ if call[1][0].startswith(filter)]
+
+ def _check_ports(self, port):
+ getattr(self, '_check_ports_%s_driver' % self.driver)(port)
+
+ def _check_ports_pika_driver(self, port):
+ rpc_server = self.servers.servers[0].server
+ # FIXME(sileht): Check other connections
+ connections = [
+ rpc_server.listener._connection
+ ]
+ for conn in connections:
+ self.assertEqual(
+ port, conn._impl.socket.getpeername()[1])
+
+ def _check_ports_rabbit_driver(self, port):
+ rpc_server = self.servers.servers[0].server
+ connection_contexts = [
+ # rpc server
+ rpc_server.listener._poll_style_listener.conn,
+ # rpc client
+ self.client.client.transport._driver._get_connection(),
+ # rpc client replies waiter
+ self.client.client.transport._driver._reply_q_conn,
+ ]
+
+ for cctxt in connection_contexts:
+ socket = cctxt.connection.channel.connection.sock
+ self.assertEqual(port, socket.getpeername()[1])
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index 2f9e7b9..1d215a5 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -122,7 +122,7 @@ class RpcServerFixture(fixtures.Fixture):
class RpcServerGroupFixture(fixtures.Fixture):
def __init__(self, conf, url, topic=None, names=None, exchange=None,
- use_fanout_ctrl=False):
+ use_fanout_ctrl=False, endpoint=None):
self.conf = conf
self.url = url
# NOTE(sileht): topic and servier_name must be uniq
@@ -133,6 +133,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
self.exchange = exchange
self.targets = [self._target(server=n) for n in self.names]
self.use_fanout_ctrl = use_fanout_ctrl
+ self.endpoint = endpoint
def setUp(self):
super(RpcServerGroupFixture, self).setUp()
@@ -149,6 +150,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
if self.use_fanout_ctrl:
ctrl = self._target(fanout=True)
server = RpcServerFixture(self.conf, self.url, target,
+ endpoint=self.endpoint,
ctrl_target=ctrl)
return server
@@ -277,19 +279,39 @@ class IsValidDistributionOf(object):
class SkipIfNoTransportURL(test_utils.BaseTestCase):
def setUp(self, conf=cfg.CONF):
super(SkipIfNoTransportURL, self).setUp(conf=conf)
- self.url = os.environ.get('TRANSPORT_URL')
+
+ driver = os.environ.get("TRANSPORT_DRIVER")
+ if driver:
+ self.url = os.environ.get('PIFPAF_URL')
+ if driver == "pika" and self.url:
+ self.url = self.url.replace("rabbit://", "pika://")
+ else:
+ self.url = os.environ.get('TRANSPORT_URL')
+
if not self.url:
self.skipTest("No transport url configured")
zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER')
if zmq_matchmaker:
- self.config(rpc_zmq_matchmaker=zmq_matchmaker)
+ self.config(rpc_zmq_matchmaker=zmq_matchmaker,
+ group="oslo_messaging_zmq")
zmq_ipc_dir = os.environ.get('ZMQ_IPC_DIR')
if zmq_ipc_dir:
- self.config(rpc_zmq_ipc_dir=zmq_ipc_dir)
+ self.config(group="oslo_messaging_zmq",
+ rpc_zmq_ipc_dir=zmq_ipc_dir)
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
if zmq_redis_port:
self.config(port=zmq_redis_port, group="matchmaker_redis")
+ self.config(check_timeout=10000, group="matchmaker_redis")
+ self.config(wait_timeout=1000, group="matchmaker_redis")
+ zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
+ if zmq_use_pub_sub:
+ self.config(use_pub_sub=zmq_use_pub_sub,
+ group='oslo_messaging_zmq')
+ zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
+ if zmq_use_router_proxy:
+ self.config(use_router_proxy=zmq_use_router_proxy,
+ group='oslo_messaging_zmq')
class NotificationFixture(fixtures.Fixture):
diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py
index ee9f56e..4a1498a 100644
--- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py
+++ b/oslo_messaging/tests/functional/zmq/multiproc_utils.py
@@ -70,7 +70,8 @@ def listener_configurer(conf):
'%(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
- log_path = conf.rpc_zmq_ipc_dir + "/" + "zmq_multiproc.log"
+ log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \
+ "/" + "zmq_multiproc.log"
file_handler = logging.StreamHandler(open(log_path, 'w'))
file_handler.setFormatter(f)
root.addHandler(file_handler)
diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py
index ea287b3..f1b89b0 100644
--- a/oslo_messaging/tests/functional/zmq/test_startup.py
+++ b/oslo_messaging/tests/functional/zmq/test_startup.py
@@ -30,10 +30,10 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase):
self.conf.prog = "test_prog"
self.conf.project = "test_project"
- kwargs = {'rpc_response_timeout': 30}
- self.config(**kwargs)
+ self.config(rpc_response_timeout=30)
- log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"
+ log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
+ str(os.getpid()) + ".log")
sys.stdout = open(log_path, "w", buffering=0)
def test_call_server_before_client(self):
diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py
index 3818a7f..18744fa 100644
--- a/oslo_messaging/tests/notify/test_dispatcher.py
+++ b/oslo_messaging/tests/notify/test_dispatcher.py
@@ -132,7 +132,7 @@ class TestDispatcher(test_utils.BaseTestCase):
dispatcher = notify_dispatcher.NotificationDispatcher(
[mock.Mock()], None)
res = dispatcher.dispatch(mock.Mock(ctxt={}, message=msg))
- self.assertEqual(None, res)
+ self.assertIsNone(res)
mylog.warning.assert_called_once_with('Unknown priority "%s"',
'what???')
diff --git a/oslo_messaging/tests/notify/test_middleware.py b/oslo_messaging/tests/notify/test_middleware.py
index 7d297df..f5deef3 100644
--- a/oslo_messaging/tests/notify/test_middleware.py
+++ b/oslo_messaging/tests/notify/test_middleware.py
@@ -49,14 +49,14 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
m(req)
# Check first notification with only 'request'
call_args = notify.call_args_list[0][0]
- self.assertEqual(call_args[1], 'http.request')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request']))
+ self.assertEqual('http.request', call_args[1])
+ self.assertEqual('INFO', call_args[3])
+ self.assertEqual(set(['request']),
+ set(call_args[2].keys()))
request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/foo/bar')
- self.assertEqual(request['REQUEST_METHOD'], 'GET')
+ self.assertEqual('/foo/bar', request['PATH_INFO'])
+ self.assertEqual('GET', request['REQUEST_METHOD'])
self.assertIn('HTTP_X_SERVICE_NAME', request)
self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
@@ -65,14 +65,14 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
# Check second notification with request + response
call_args = notify.call_args_list[1][0]
- self.assertEqual(call_args[1], 'http.response')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request', 'response']))
+ self.assertEqual('http.response', call_args[1])
+ self.assertEqual('INFO', call_args[3])
+ self.assertEqual(set(['request', 'response']),
+ set(call_args[2].keys()))
request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/foo/bar')
- self.assertEqual(request['REQUEST_METHOD'], 'GET')
+ self.assertEqual('/foo/bar', request['PATH_INFO'])
+ self.assertEqual('GET', request['REQUEST_METHOD'])
self.assertIn('HTTP_X_SERVICE_NAME', request)
self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
@@ -80,8 +80,8 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
"WSGI fields are filtered out")
response = call_args[2]['response']
- self.assertEqual(response['status'], '200 OK')
- self.assertEqual(response['headers']['content-length'], '13')
+ self.assertEqual('200 OK', response['status'])
+ self.assertEqual('13', response['headers']['content-length'])
def test_notification_response_failure(self):
m = middleware.RequestNotifier(FakeFailingApp())
@@ -97,14 +97,14 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
pass
# Check first notification with only 'request'
call_args = notify.call_args_list[0][0]
- self.assertEqual(call_args[1], 'http.request')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request']))
+ self.assertEqual('http.request', call_args[1])
+ self.assertEqual('INFO', call_args[3])
+ self.assertEqual(set(['request']),
+ set(call_args[2].keys()))
request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/foo/bar')
- self.assertEqual(request['REQUEST_METHOD'], 'GET')
+ self.assertEqual('/foo/bar', request['PATH_INFO'])
+ self.assertEqual('GET', request['REQUEST_METHOD'])
self.assertIn('HTTP_X_SERVICE_NAME', request)
self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
@@ -113,14 +113,14 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
# Check second notification with 'request' and 'exception'
call_args = notify.call_args_list[1][0]
- self.assertEqual(call_args[1], 'http.response')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request', 'exception']))
+ self.assertEqual('http.response', call_args[1])
+ self.assertEqual('INFO', call_args[3])
+ self.assertEqual(set(['request', 'exception']),
+ set(call_args[2].keys()))
request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/foo/bar')
- self.assertEqual(request['REQUEST_METHOD'], 'GET')
+ self.assertEqual('/foo/bar', request['PATH_INFO'])
+ self.assertEqual('GET', request['REQUEST_METHOD'])
self.assertIn('HTTP_X_SERVICE_NAME', request)
self.assertNotIn('HTTP_X_AUTH_TOKEN', request)
self.assertFalse(any(map(lambda s: s.startswith('wsgi.'),
@@ -130,7 +130,7 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
exception = call_args[2]['exception']
self.assertIn('middleware.py', exception['traceback'][0])
self.assertIn('It happens!', exception['traceback'][-1])
- self.assertEqual(exception['value'], "Exception('It happens!',)")
+ self.assertEqual("Exception('It happens!',)", exception['value'])
def test_process_request_fail(self):
def notify_error(context, publisher_id, event_type,
@@ -168,23 +168,23 @@ class NotifierMiddlewareTest(utils.BaseTestCase):
# Check GET request does not send notification
m(req)
m(req1)
- self.assertEqual(len(notify.call_args_list), 0)
+ self.assertEqual(0, len(notify.call_args_list))
# Check non-GET request does send notification
m(req2)
- self.assertEqual(len(notify.call_args_list), 2)
+ self.assertEqual(2, len(notify.call_args_list))
call_args = notify.call_args_list[0][0]
- self.assertEqual(call_args[1], 'http.request')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request']))
+ self.assertEqual('http.request', call_args[1])
+ self.assertEqual('INFO', call_args[3])
+ self.assertEqual(set(['request']),
+ set(call_args[2].keys()))
request = call_args[2]['request']
- self.assertEqual(request['PATH_INFO'], '/accept/foo')
- self.assertEqual(request['REQUEST_METHOD'], 'POST')
+ self.assertEqual('/accept/foo', request['PATH_INFO'])
+ self.assertEqual('POST', request['REQUEST_METHOD'])
call_args = notify.call_args_list[1][0]
- self.assertEqual(call_args[1], 'http.response')
- self.assertEqual(call_args[3], 'INFO')
- self.assertEqual(set(call_args[2].keys()),
- set(['request', 'response']))
+ self.assertEqual('http.response', call_args[1])
+ self.assertEqual('INFO', call_args[3])
+ self.assertEqual(set(['request', 'response']),
+ set(call_args[2].keys()))
diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py
index 62a547f..03e46c8 100644
--- a/oslo_messaging/tests/rpc/test_server.py
+++ b/oslo_messaging/tests/rpc/test_server.py
@@ -604,8 +604,8 @@ class TestServerLocking(test_utils.BaseTestCase):
self.server.stop()
self.server.wait()
- self.assertEqual(len(self.executors), 1)
- self.assertEqual(self.executors[0]._calls, ['shutdown'])
+ self.assertEqual(1, len(self.executors))
+ self.assertEqual(['shutdown'], self.executors[0]._calls)
self.assertTrue(self.server.listener.cleanup.called)
def test_reversed_order(self):
@@ -624,8 +624,8 @@ class TestServerLocking(test_utils.BaseTestCase):
self.server.wait()
- self.assertEqual(len(self.executors), 1)
- self.assertEqual(self.executors[0]._calls, ['shutdown'])
+ self.assertEqual(1, len(self.executors))
+ self.assertEqual(['shutdown'], self.executors[0]._calls)
def test_wait_for_running_task(self):
# Test that if 2 threads call a method simultaneously, both will wait,
@@ -675,7 +675,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# We haven't signalled completion yet, so submit shouldn't have run
self.assertEqual(1, len(self.executors))
- self.assertEqual(self.executors[0]._calls, [])
+ self.assertEqual([], self.executors[0]._calls)
self.assertFalse(waiter_finished.is_set())
# Let the runner complete
@@ -687,7 +687,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# and execute ran
self.assertTrue(waiter_finished.is_set())
self.assertEqual(1, len(self.executors))
- self.assertEqual(self.executors[0]._calls, [])
+ self.assertEqual([], self.executors[0]._calls)
def test_start_stop_wait_stop_wait(self):
# Test that we behave correctly when calling stop/wait more than once.
@@ -700,7 +700,7 @@ class TestServerLocking(test_utils.BaseTestCase):
self.server.wait()
self.assertEqual(len(self.executors), 1)
- self.assertEqual(self.executors[0]._calls, ['shutdown'])
+ self.assertEqual(['shutdown'], self.executors[0]._calls)
self.assertTrue(self.server.listener.cleanup.called)
def test_state_wrapping(self):
@@ -736,7 +736,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# The server should have started, but stop should not have been called
self.assertEqual(1, len(self.executors))
- self.assertEqual(self.executors[0]._calls, [])
+ self.assertEqual([], self.executors[0]._calls)
self.assertFalse(thread1_finished.is_set())
self.server.stop()
@@ -745,7 +745,7 @@ class TestServerLocking(test_utils.BaseTestCase):
# We should have gone through all the states, and thread1 should still
# be waiting
self.assertEqual(1, len(self.executors))
- self.assertEqual(self.executors[0]._calls, ['shutdown'])
+ self.assertEqual(['shutdown'], self.executors[0]._calls)
self.assertFalse(thread1_finished.is_set())
# Start again
@@ -753,8 +753,8 @@ class TestServerLocking(test_utils.BaseTestCase):
# We should now record 4 executors (2 for each server)
self.assertEqual(2, len(self.executors))
- self.assertEqual(self.executors[0]._calls, ['shutdown'])
- self.assertEqual(self.executors[1]._calls, [])
+ self.assertEqual(['shutdown'], self.executors[0]._calls)
+ self.assertEqual([], self.executors[1]._calls)
self.assertFalse(thread1_finished.is_set())
# Allow thread1 to complete
@@ -764,8 +764,8 @@ class TestServerLocking(test_utils.BaseTestCase):
# thread1 should now have finished, and stop should not have been
# called again on either the first or second executor
self.assertEqual(2, len(self.executors))
- self.assertEqual(self.executors[0]._calls, ['shutdown'])
- self.assertEqual(self.executors[1]._calls, [])
+ self.assertEqual(['shutdown'], self.executors[0]._calls)
+ self.assertEqual([], self.executors[1]._calls)
self.assertTrue(thread1_finished.is_set())
@mock.patch.object(server_module, 'DEFAULT_LOG_AFTER', 1)
diff --git a/oslo_messaging/tests/test_config_opts_proxy.py b/oslo_messaging/tests/test_config_opts_proxy.py
index 6d51716..e51794c 100644
--- a/oslo_messaging/tests/test_config_opts_proxy.py
+++ b/oslo_messaging/tests/test_config_opts_proxy.py
@@ -53,25 +53,19 @@ class TestConfigOptsProxy(test_utils.BaseTestCase):
'unknown_group')
self.assertTrue(isinstance(getattr(conf, group),
conf.GroupAttrProxy))
- self.assertEqual(conf.oslo_messaging_rabbit.rabbit_retry_interval,
- 1)
- self.assertEqual(conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count,
- 2)
- self.assertEqual(conf.oslo_messaging_rabbit.rabbit_max_retries,
- 3)
+ self.assertEqual(1, conf.oslo_messaging_rabbit.rabbit_retry_interval)
+ self.assertEqual(2,
+ conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
+ self.assertEqual(3, conf.oslo_messaging_rabbit.rabbit_max_retries)
self.assertRaises(cfg.NoSuchOptError,
conf.oslo_messaging_rabbit.__getattr__,
'unknown_opt')
self.assertRaises(ValueError,
conf.oslo_messaging_rabbit.__getattr__,
'kombu_reconnect_delay')
- self.assertEqual(conf.oslo_messaging_rabbit.list_str,
- ['1', '2', '3'])
- self.assertEqual(conf.oslo_messaging_rabbit.list_int,
- [1, 2, 3])
- self.assertEqual(conf.oslo_messaging_rabbit.dict,
- {'x': '1', 'y': '2', 'z': '3'})
- self.assertEqual(conf.oslo_messaging_rabbit.bool,
- True)
- self.assertEqual(conf.oslo_messaging_rabbit.str,
- 'default')
+ self.assertEqual(['1', '2', '3'], conf.oslo_messaging_rabbit.list_str)
+ self.assertEqual([1, 2, 3], conf.oslo_messaging_rabbit.list_int)
+ self.assertEqual({'x': '1', 'y': '2', 'z': '3'},
+ conf.oslo_messaging_rabbit.dict)
+ self.assertEqual(True, conf.oslo_messaging_rabbit.bool)
+ self.assertEqual('default', conf.oslo_messaging_rabbit.str)
diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py
index 2ca8f8a..0e4b1f8 100644
--- a/oslo_messaging/tests/test_opts.py
+++ b/oslo_messaging/tests/test_opts.py
@@ -32,11 +32,12 @@ class OptsTestCase(test_utils.BaseTestCase):
super(OptsTestCase, self).setUp()
def _test_list_opts(self, result):
- self.assertEqual(5, len(result))
+ self.assertEqual(6, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('matchmaker_redis', groups)
+ self.assertIn('oslo_messaging_zmq', groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_notifications', groups)
self.assertIn('oslo_messaging_rabbit', groups)
diff --git a/oslo_messaging/tests/test_serializer.py b/oslo_messaging/tests/test_serializer.py
index 329d9de..858da45 100644
--- a/oslo_messaging/tests/test_serializer.py
+++ b/oslo_messaging/tests/test_serializer.py
@@ -41,7 +41,7 @@ class TestRequestContextSerializer(test_utils.BaseTestCase):
entity = self.serializer.serialize_entity(self.context, self.entity)
self.assertFalse(self.serializer._base.serialize_entity.called)
- self.assertEqual(entity, self.entity)
+ self.assertEqual(self.entity, entity)
def test_deserialize_entity(self):
self.serializer.deserialize_entity(self.context, self.entity)
@@ -56,12 +56,12 @@ class TestRequestContextSerializer(test_utils.BaseTestCase):
entity = self.serializer.deserialize_entity(self.context, self.entity)
self.assertFalse(self.serializer._base.serialize_entity.called)
- self.assertEqual(entity, self.entity)
+ self.assertEqual(self.entity, entity)
def test_serialize_context(self):
new_context = self.serializer.serialize_context(self.context)
- self.assertEqual(new_context, self.context.to_dict())
+ self.assertEqual(self.context.to_dict(), new_context)
@mock.patch.object(common_context.RequestContext, 'from_dict',
return_value='foobar')
@@ -70,6 +70,6 @@ class TestRequestContextSerializer(test_utils.BaseTestCase):
mock_to_dict.assert_called_with(self.context)
self.assertEqual(
- new_context,
- common_context.RequestContext.from_dict(self.context)
+ common_context.RequestContext.from_dict(self.context),
+ new_context
)
diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py
index 89619cf..01ead7e 100644
--- a/oslo_messaging/tests/test_transport.py
+++ b/oslo_messaging/tests/test_transport.py
@@ -145,10 +145,12 @@ class GetTransportTestCase(test_utils.BaseTestCase):
transport_ = oslo_messaging.get_transport(self.conf, **kwargs)
if self.aliases is not None:
- self.assertEqual(fake_logger.warning.mock_calls,
- [mock.call('legacy "rpc_backend" is deprecated, '
- '"testfoo" must be replaced by '
- '"%s"' % self.aliases.get('testfoo'))])
+ self.assertEqual(
+ [mock.call('legacy "rpc_backend" is deprecated, '
+ '"testfoo" must be replaced by '
+ '"%s"' % self.aliases.get('testfoo'))],
+ fake_logger.warning.mock_calls
+ )
self.assertIsNotNone(transport_)
self.assertIs(transport_.conf, self.conf)
@@ -354,10 +356,10 @@ class TestTransportUrlCustomisation(test_utils.BaseTestCase):
self.assertNotEqual(self.url1, self.url4)
def test_query(self):
- self.assertEqual(self.url1.query, {'x': '1', 'y': '2', 'z': '3'})
- self.assertEqual(self.url2.query, {'foo': 'bar'})
- self.assertEqual(self.url3.query, {'l': '1,2,3'})
- self.assertEqual(self.url4.query, {'d': 'x:1,y:2,z:3'})
+ self.assertEqual({'x': '1', 'y': '2', 'z': '3'}, self.url1.query)
+ self.assertEqual({'foo': 'bar'}, self.url2.query)
+ self.assertEqual({'l': '1,2,3'}, self.url3.query)
+ self.assertEqual({'d': 'x:1,y:2,z:3'}, self.url4.query)
class TestTransportHostCustomisation(test_utils.BaseTestCase):
diff --git a/oslo_messaging/tests/utils.py b/oslo_messaging/tests/utils.py
index aa2a563..c5fca87 100644
--- a/oslo_messaging/tests/utils.py
+++ b/oslo_messaging/tests/utils.py
@@ -39,6 +39,9 @@ class BaseTestCase(base.BaseTestCase):
self.messaging_conf.transport_driver = 'fake'
self.conf = self.messaging_conf.conf
+ self.conf.project = 'project'
+ self.conf.prog = 'prog'
+
moxfixture = self.useFixture(moxstubout.MoxStubout())
self.mox = moxfixture.mox
self.stubs = moxfixture.stubs
diff --git a/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml b/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml
new file mode 100644
index 0000000..fafa33d
--- /dev/null
+++ b/releasenotes/notes/connection_ttl-2cf0fe6e1ab8c73c.yaml
@@ -0,0 +1,8 @@
+---
+features:
+ - |
+ | Idle connections in the pool will be expired and closed.
+ | Default ttl is 1200s. Next configuration params was added
+
+ * *conn_pool_ttl* (defaul 1200)
+ * *conn_pool_min_size* (default 2)
diff --git a/releasenotes/notes/option-rabbitmq-max_retries-has-been-deprecated-471f66a9e6d672a2.yaml b/releasenotes/notes/option-rabbitmq-max_retries-has-been-deprecated-471f66a9e6d672a2.yaml
new file mode 100644
index 0000000..dc8ac5d
--- /dev/null
+++ b/releasenotes/notes/option-rabbitmq-max_retries-has-been-deprecated-471f66a9e6d672a2.yaml
@@ -0,0 +1,5 @@
+---
+deprecations:
+ - The rabbitmq driver option ``DEFAULT/max_retries`` has been deprecated
+ for removal (at a later point in the future) as it did not make logical
+ sense for notifications and for RPC.
diff --git a/releasenotes/source/locale/en_GB/LC_MESSAGES/releasenotes.po b/releasenotes/source/locale/en_GB/LC_MESSAGES/releasenotes.po
new file mode 100644
index 0000000..e3a6f81
--- /dev/null
+++ b/releasenotes/source/locale/en_GB/LC_MESSAGES/releasenotes.po
@@ -0,0 +1,30 @@
+# Andi Chandler <andi@gowling.com>, 2016. #zanata
+msgid ""
+msgstr ""
+"Project-Id-Version: oslo.messaging Release Notes 5.5.1\n"
+"Report-Msgid-Bugs-To: \n"
+"POT-Creation-Date: 2016-07-01 03:41+0000\n"
+"MIME-Version: 1.0\n"
+"Content-Type: text/plain; charset=UTF-8\n"
+"Content-Transfer-Encoding: 8bit\n"
+"PO-Revision-Date: 2016-06-28 05:52+0000\n"
+"Last-Translator: Andi Chandler <andi@gowling.com>\n"
+"Language-Team: English (United Kingdom)\n"
+"Language: en-GB\n"
+"X-Generator: Zanata 3.7.3\n"
+"Plural-Forms: nplurals=2; plural=(n != 1)\n"
+
+msgid "5.2.0"
+msgstr "5.2.0"
+
+msgid "Other Notes"
+msgstr "Other Notes"
+
+msgid "Switch to reno for managing release notes."
+msgstr "Switch to reno for managing release notes."
+
+msgid "Unreleased Release Notes"
+msgstr "Unreleased Release Notes"
+
+msgid "oslo.messaging Release Notes"
+msgstr "oslo.messaging Release Notes"
diff --git a/requirements.txt b/requirements.txt
index f215326..7397aaf 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,21 +4,21 @@
pbr>=1.6 # Apache-2.0
-futurist>=0.11.0 # Apache-2.0
-oslo.config>=3.9.0 # Apache-2.0
-oslo.context>=2.2.0 # Apache-2.0
+futurist!=0.15.0,>=0.11.0 # Apache-2.0
+oslo.config>=3.14.0 # Apache-2.0
+oslo.context>=2.6.0 # Apache-2.0
oslo.log>=1.14.0 # Apache-2.0
-oslo.utils>=3.11.0 # Apache-2.0
+oslo.utils>=3.16.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
oslo.service>=1.10.0 # Apache-2.0
oslo.i18n>=2.1.0 # Apache-2.0
-stevedore>=1.10.0 # Apache-2.0
+stevedore>=1.16.0 # Apache-2.0
debtcollector>=1.2.0 # Apache-2.0
monotonic>=0.6 # Apache-2.0
# for jsonutils
six>=1.9.0 # MIT
-cachetools>=1.0.0 # MIT License
+cachetools>=1.1.0 # MIT License
# FIXME(markmc): remove this when the drivers no longer
diff --git a/setup-test-env-pika.sh b/setup-test-env-pika.sh
deleted file mode 100755
index 5fe1895..0000000
--- a/setup-test-env-pika.sh
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/bin/bash
-set -e
-
-. tools/functions.sh
-
-DATADIR=$(mktemp -d /tmp/OSLOMSG-RABBIT.XXXXX)
-trap "clean_exit $DATADIR" EXIT
-
-export RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
-export RABBITMQ_NODE_PORT=65123
-export RABBITMQ_NODENAME=oslomsg-test@localhost
-export RABBITMQ_LOG_BASE=$DATADIR
-export RABBITMQ_MNESIA_BASE=$DATADIR
-export RABBITMQ_PID_FILE=$DATADIR/pid
-export HOME=$DATADIR
-
-# NOTE(sileht): We directly use the rabbitmq scripts
-# to avoid distribution check, like running as root/rabbitmq
-# enforcing.
-export PATH=/usr/lib/rabbitmq/bin/:$PATH
-
-
-mkfifo ${DATADIR}/out
-rabbitmq-server &> ${DATADIR}/out &
-wait_for_line "Starting broker... completed" "ERROR:" ${DATADIR}/out
-
-rabbitmqctl add_user oslomsg oslosecret
-rabbitmqctl set_permissions "oslomsg" ".*" ".*" ".*"
-
-
-export TRANSPORT_URL=pika://oslomsg:oslosecret@127.0.0.1:65123//
-$*
diff --git a/setup-test-env-rabbit.sh b/setup-test-env-rabbit.sh
deleted file mode 100755
index aaf6603..0000000
--- a/setup-test-env-rabbit.sh
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/bin/bash
-set -e
-
-. tools/functions.sh
-
-DATADIR=$(mktemp -d /tmp/OSLOMSG-RABBIT.XXXXX)
-trap "clean_exit $DATADIR" EXIT
-
-export RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
-export RABBITMQ_NODE_PORT=65123
-export RABBITMQ_NODENAME=oslomsg-test@localhost
-export RABBITMQ_LOG_BASE=$DATADIR
-export RABBITMQ_MNESIA_BASE=$DATADIR
-export RABBITMQ_PID_FILE=$DATADIR/pid
-export HOME=$DATADIR
-
-# NOTE(sileht): We directly use the rabbitmq scripts
-# to avoid distribution check, like running as root/rabbitmq
-# enforcing.
-export PATH=/usr/lib/rabbitmq/bin/:$PATH
-
-
-mkfifo ${DATADIR}/out
-rabbitmq-server &> ${DATADIR}/out &
-wait_for_line "Starting broker... completed" "ERROR:" ${DATADIR}/out
-
-rabbitmqctl add_user oslomsg oslosecret
-rabbitmqctl set_permissions "oslomsg" ".*" ".*" ".*"
-
-
-export TRANSPORT_URL=rabbit://oslomsg:oslosecret@127.0.0.1:65123//
-$*
diff --git a/setup-test-env-zmq-proxy.sh b/setup-test-env-zmq-proxy.sh
new file mode 100755
index 0000000..12649c8
--- /dev/null
+++ b/setup-test-env-zmq-proxy.sh
@@ -0,0 +1,37 @@
+#!/bin/bash
+set -e
+
+. tools/functions.sh
+
+DATADIR=$(mktemp -d /tmp/OSLOMSG-ZEROMQ.XXXXX)
+trap "clean_exit $DATADIR" EXIT
+
+export TRANSPORT_URL=zmq://
+export ZMQ_MATCHMAKER=redis
+export ZMQ_REDIS_PORT=65123
+export ZMQ_IPC_DIR=${DATADIR}
+export ZMQ_USE_PUB_SUB=false
+export ZMQ_USE_ROUTER_PROXY=true
+
+export ZMQ_PROXY_HOST=127.0.0.1
+
+cat > ${DATADIR}/zmq.conf <<EOF
+[DEFAULT]
+transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
+rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
+rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
+use_pub_sub=${ZMQ_USE_PUB_SUB}
+use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
+[matchmaker_redis]
+port=${ZMQ_REDIS_PORT}
+
+[zmq_proxy_opts]
+host=${ZMQ_PROXY_HOST}
+EOF
+
+redis-server --port $ZMQ_REDIS_PORT &
+
+oslo-messaging-zmq-proxy --debug True --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-proxy.log 2>&1 &
+
+$*
diff --git a/setup-test-env-zmq-pub-sub.sh b/setup-test-env-zmq-pub-sub.sh
new file mode 100755
index 0000000..5551be5
--- /dev/null
+++ b/setup-test-env-zmq-pub-sub.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+set -e
+
+. tools/functions.sh
+
+DATADIR=$(mktemp -d /tmp/OSLOMSG-ZEROMQ.XXXXX)
+trap "clean_exit $DATADIR" EXIT
+
+export TRANSPORT_URL=zmq://
+export ZMQ_MATCHMAKER=redis
+export ZMQ_REDIS_PORT=65123
+export ZMQ_IPC_DIR=${DATADIR}
+export ZMQ_USE_PUB_SUB=true
+export ZMQ_USE_ROUTER_PROXY=true
+
+cat > ${DATADIR}/zmq.conf <<EOF
+[DEFAULT]
+transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
+rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
+rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
+use_pub_sub=${ZMQ_USE_PUB_SUB}
+use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
+[matchmaker_redis]
+port=${ZMQ_REDIS_PORT}
+EOF
+
+redis-server --port $ZMQ_REDIS_PORT &
+
+oslo-messaging-zmq-proxy --debug True --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-proxy.log 2>&1 &
+
+$*
diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh
index ba767ca..8780872 100755
--- a/setup-test-env-zmq.sh
+++ b/setup-test-env-zmq.sh
@@ -10,20 +10,23 @@ export TRANSPORT_URL=zmq://
export ZMQ_MATCHMAKER=redis
export ZMQ_REDIS_PORT=65123
export ZMQ_IPC_DIR=${DATADIR}
+export ZMQ_USE_PUB_SUB=false
+export ZMQ_USE_ROUTER_PROXY=false
cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT]
transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
-use_pub_sub=true
-use_router_proxy=true
+use_pub_sub=${ZMQ_USE_PUB_SUB}
+use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
[matchmaker_redis]
port=${ZMQ_REDIS_PORT}
EOF
redis-server --port $ZMQ_REDIS_PORT &
-oslo-messaging-zmq-proxy --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-publisher.log 2>&1 &
+oslo-messaging-zmq-proxy --debug True --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-proxy.log 2>&1 &
$*
diff --git a/setup.cfg b/setup.cfg
index aa6f2d5..9a3665e 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -17,6 +17,7 @@ classifier =
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.4
+ Programming Language :: Python :: 3.5
[files]
packages =
@@ -26,6 +27,7 @@ packages =
console_scripts =
oslo-messaging-zmq-proxy = oslo_messaging._cmd.zmq_proxy:main
oslo-messaging-zmq-broker = oslo_messaging._cmd.zmq_proxy:main
+ oslo-messaging-send-notification = oslo_messaging.notify.notifier:_send_notification
oslo.messaging.drivers =
rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver
diff --git a/test-requirements.txt b/test-requirements.txt
index 6f5b25c..bc197fa 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -5,8 +5,7 @@
# Hacking already pins down pep8, pyflakes and flake8
hacking<0.11,>=0.10.0
-discover # BSD
-fixtures<2.0,>=1.3.1 # Apache-2.0/BSD
+fixtures>=3.0.0 # Apache-2.0/BSD
mock>=2.0 # BSD
mox3>=0.7.0 # Apache-2.0
python-subunit>=0.0.18 # Apache-2.0/BSD
@@ -14,7 +13,7 @@ testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testtools>=1.4.0 # MIT
oslotest>=1.10.0 # Apache-2.0
-
+pifpaf>=0.10.0 # Apache-2.0
# for test_matchmaker_redis
redis>=2.10.0 # MIT
@@ -30,9 +29,9 @@ kafka-python<1.0.0,>=0.9.5 # Apache-2.0
coverage>=3.6 # Apache-2.0
# this is required for the docs build jobs
-sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD
+sphinx!=1.3b1,<1.3,>=1.2.1 # BSD
oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0
-reno>=1.6.2 # Apache2
+reno>=1.8.0 # Apache2
# AMQP 1.0 support depends on the Qpid Proton AMQP 1.0
# development libraries.
diff --git a/tools/simulator.py b/tools/simulator.py
index cf03555..2f3161b 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -43,6 +43,7 @@ CLIENTS = []
MESSAGES = []
IS_RUNNING = True
SERVERS = []
+TRANSPORT = None
USAGE = """ Usage: ./simulator.py [-h] [--url URL] [-d DEBUG]\
{notify-server,notify-client,rpc-server,rpc-client} ...
@@ -134,7 +135,7 @@ class MessageStatsCollector(object):
max_latency = 0
sum_latencies = 0
- for i in range(count):
+ for i in six.moves.range(count):
p = self.buffer[i]
size += len(p.cargo)
@@ -380,10 +381,10 @@ def generate_messages(messages_count):
messages_count = MESSAGES_LIMIT
LOG.info("Generating %d random messages", messages_count)
- for i in range(messages_count):
+ for i in six.moves.range(messages_count):
length = RANDOM_GENERATOR()
msg = ''.join(random.choice(
- string.ascii_lowercase) for x in range(length))
+ string.ascii_lowercase) for x in six.moves.range(length))
MESSAGES.append(msg)
LOG.info("Messages has been prepared")
@@ -398,6 +399,10 @@ def wrap_sigexit(f):
e.signo)
for server in SERVERS:
server.stop()
+ server.wait()
+ finally:
+ if TRANSPORT:
+ TRANSPORT.cleanup()
return inner
@@ -433,7 +438,7 @@ def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
is_cast, messages_count, duration):
p = eventlet.GreenPool(size=threads)
targets = itertools.cycle(targets)
- for i in range(0, threads):
+ for i in six.moves.range(threads):
target = next(targets)
LOG.debug("starting RPC client for target %s", target)
client_builder = functools.partial(RPCClient, i, transport, target,
@@ -446,7 +451,7 @@ def spawn_rpc_clients(threads, transport, targets, wait_after_msg, timeout,
def spawn_notify_clients(threads, topic, transport, message_count,
wait_after_msg, timeout, duration):
p = eventlet.GreenPool(size=threads)
- for i in range(0, threads):
+ for i in six.moves.range(threads):
client_builder = functools.partial(NotifyClient, i, transport, topic,
wait_after_msg)
p.spawn_n(send_messages, i, client_builder, message_count, duration)
@@ -472,7 +477,7 @@ def send_messages(client_id, client_builder, messages_count, duration):
else:
LOG.debug("Sending %d messages using client %d",
messages_count, client_id)
- for _ in six.moves.range(0, messages_count):
+ for _ in six.moves.range(messages_count):
client.send_msg()
eventlet.sleep()
if not IS_RUNNING:
@@ -657,10 +662,11 @@ def main():
if args.config_file:
cfg.CONF(["--config-file", args.config_file])
+ global TRANSPORT
if args.mode in ['rpc-server', 'rpc-client']:
- transport = messaging.get_transport(cfg.CONF, url=args.url)
+ TRANSPORT = messaging.get_transport(cfg.CONF, url=args.url)
else:
- transport = messaging.get_notification_transport(cfg.CONF,
+ TRANSPORT = messaging.get_notification_transport(cfg.CONF,
url=args.url)
if args.mode in ['rpc-client', 'notify-client']:
@@ -678,26 +684,26 @@ def main():
if args.mode == 'rpc-server':
target = messaging.Target(topic=args.topic, server=args.server)
if args.url.startswith('zmq'):
- cfg.CONF.rpc_zmq_matchmaker = "redis"
+ cfg.CONF.oslo_messaging_zmq.rpc_zmq_matchmaker = "redis"
- endpoint = rpc_server(transport, target, args.wait_before_answer,
+ endpoint = rpc_server(TRANSPORT, target, args.wait_before_answer,
args.executor, args.duration)
show_server_stats(endpoint, args.json_filename)
elif args.mode == 'notify-server':
- endpoint = notify_server(transport, args.topic,
+ endpoint = notify_server(TRANSPORT, args.topic,
args.wait_before_answer, args.duration,
args.requeue)
show_server_stats(endpoint, args.json_filename)
elif args.mode == 'batch-notify-server':
- endpoint = batch_notify_server(transport, args.topic,
+ endpoint = batch_notify_server(TRANSPORT, args.topic,
args.wait_before_answer,
args.duration, args.requeue)
show_server_stats(endpoint, args.json_filename)
elif args.mode == 'notify-client':
- spawn_notify_clients(args.threads, args.topic, transport,
+ spawn_notify_clients(args.threads, args.topic, TRANSPORT,
args.messages, args.wait_after_msg,
args.timeout, args.duration)
show_client_stats(CLIENTS, args.json_filename)
@@ -707,7 +713,7 @@ def main():
targets = [messaging.Target(
topic=topic, server=server_name, fanout=args.is_fanout) for
topic, server_name in targets]
- spawn_rpc_clients(args.threads, transport, targets,
+ spawn_rpc_clients(args.threads, TRANSPORT, targets,
args.wait_after_msg, args.timeout, args.is_cast,
args.messages, args.duration)
diff --git a/tox.ini b/tox.ini
index e39daba..c023130 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
[tox]
-envlist = py34,py27,pep8,bandit
+envlist = py35,py34,py27,pep8,bandit
[testenv]
setenv =
@@ -25,22 +25,52 @@ commands = {posargs}
commands = python setup.py build_sphinx
[testenv:py27-func-rabbit]
-commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
+setenv = TRANSPORT_DRIVER=rabbit
+commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py34-func-rabbit]
+setenv = TRANSPORT_DRIVER=rabbit
basepython = python3.4
-commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
+commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
+
+[testenv:py35-func-rabbit]
+setenv = TRANSPORT_DRIVER=rabbit
+basepython = python3.5
+commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py27-func-pika]
-commands = {toxinidir}/setup-test-env-pika.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
+setenv = TRANSPORT_DRIVER=pika
+commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py27-func-amqp1]
setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
-# NOTE(flaper87): This gate job run on fedora21 for now.
-commands = {toxinidir}/setup-test-env-qpid.sh 1.0 python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
+# NOTE(kgiusti): This gate job runs on Centos 7 for now.
+commands = {toxinidir}/setup-test-env-qpid.sh 1.0 python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
+
+[testenv:py34-func-amqp1]
+basepython = python3.4
+setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
+# NOTE(kgiusti): This gate job runs on Centos 7 for now.
+commands = {toxinidir}/setup-test-env-qpid.sh 1.0 python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
+
+[testenv:py35-func-amqp1]
+basepython = python3.5
+setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//
+# NOTE(kgiusti): This gate job runs on Centos 7 for now.
+commands = {toxinidir}/setup-test-env-qpid.sh 1.0 python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py27-func-zeromq]
-commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
+commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
+
+[testenv:py34-func-zeromq]
+basepython = python3.4
+commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
+
+[testenv:py27-func-zeromq-proxy]
+commands = {toxinidir}/setup-test-env-zmq-proxy.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
+
+[testenv:py27-func-zeromq-pub-sub]
+commands = {toxinidir}/setup-test-env-zmq-pub-sub.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
[testenv:bandit]
deps = -r{toxinidir}/test-requirements.txt