summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOleksii Zamiatin <ozamiatin@mirantis.com>2016-08-04 15:31:45 +0300
committerozamiatin <ozamiatin@mirantis.com>2016-08-05 11:36:50 +0300
commit7c5d039fd355e60e099a0a36408c85a08bfcc2ad (patch)
tree76e4c69470840f0b6f2f82188ecb6bcb5834f189
parentb259f88b092ebed47ce2b2138f15e70d590d3535 (diff)
downloadoslo-messaging-7c5d039fd355e60e099a0a36408c85a08bfcc2ad.tar.gz
Move zmq driver options into its own group
ZeroMQ driver options are current stored into the DEFAULT group. This change makes the zmq configuration clearer by putting its options into oslo_messaging_zmq group. Change-Id: Ia00fda005b1664750d2646f8c82ebdf295b156fb Closes-bug: #1417040 Co-Authored-By: Oleksii Zamiatin <ozamiatin@mirantis.com>
-rw-r--r--doc/source/zmq_driver.rst43
-rw-r--r--oslo_messaging/_cmd/zmq_proxy.py5
-rw-r--r--[-rwxr-xr-x]oslo_messaging/_drivers/impl_pika.py0
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py96
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_client.py8
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py3
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py3
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py3
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py12
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/zmq_server.py13
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_address.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_options.py122
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_socket.py26
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_updater.py2
-rw-r--r--oslo_messaging/conffixture.py3
-rw-r--r--oslo_messaging/opts.py5
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_impl_zmq.py2
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_pub_sub.py2
-rw-r--r--oslo_messaging/tests/drivers/zmq/zmq_common.py6
-rw-r--r--oslo_messaging/tests/functional/utils.py12
-rw-r--r--oslo_messaging/tests/functional/zmq/multiproc_utils.py3
-rw-r--r--oslo_messaging/tests/functional/zmq/test_startup.py6
-rw-r--r--oslo_messaging/tests/test_opts.py3
-rwxr-xr-xsetup-test-env-zmq-proxy.sh1
-rwxr-xr-xsetup-test-env-zmq-pub-sub.sh1
-rwxr-xr-xsetup-test-env-zmq.sh1
-rwxr-xr-xtools/simulator.py2
30 files changed, 230 insertions, 167 deletions
diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst
index e73fdf9..bcc3d66 100644
--- a/doc/source/zmq_driver.rst
+++ b/doc/source/zmq_driver.rst
@@ -85,12 +85,14 @@ Configuration
Enabling (mandatory)
--------------------
-To enable the driver, in the section [DEFAULT] of the conf file,
-the 'rpc_backend' flag must be set to 'zmq' and the 'rpc_zmq_host' flag
+To enable the driver the 'transport_url' option must be set to 'zmq://'
+in the section [DEFAULT] of the conf file, the 'rpc_zmq_host' flag
must be set to the hostname of the current node. ::
[DEFAULT]
- rpc_backend = zmq
+ transport_url = "zmq://"
+
+ [oslo_messaging_zmq]
rpc_zmq_host = {hostname}
@@ -110,27 +112,17 @@ RedisMatchMaker: loads the hash table from a remote Redis server, supports
dynamic host/topic registrations, host expiration, and hooks for consuming
applications to acknowledge or neg-acknowledge topic.host service availability.
-To set the MatchMaker class, use option 'rpc_zmq_matchmaker' in [DEFAULT]. ::
-
- rpc_zmq_matchmaker = dummy
-
-or::
-
- rpc_zmq_matchmaker = redis
+For ZeroMQ driver Redis is configured in transport_url also. For using Redis
+specify the URL as follows::
-To specify the Redis server for RedisMatchMaker, use options in
-[matchmaker_redis] of each project. ::
-
- [matchmaker_redis]
- host = 127.0.0.1
- port = 6379
+ [DEFAULT]
+ transport_url = "zmq+redis://127.0.0.1:6379"
In order to cleanup redis storage from expired records (e.g. target listener
goes down) TTL may be applied for keys. Configure 'zmq_target_expire' option
which is 120 (seconds) by default. The option is related not specifically to
-redis so it is also defined in [DEFAULT] section. If option value is <= 0
-then keys don't expire and live forever in the storage.
-
+redis so it is also defined in [oslo_messaging_zmq] section. If option value
+is <= 0 then keys don't expire and live forever in the storage.
MatchMaker Data Source (mandatory)
----------------------------------
@@ -159,11 +151,10 @@ we use Sentinel solution and redis master-slave-slave configuration (if we have
3 controllers and run Redis on each of them).
To deploy redis with HA follow the `sentinel-install`_ instructions. From the
-messaging driver's side you will need to setup following configuration which
-is different from a single-node redis deployment ::
+messaging driver's side you will need to setup following configuration ::
- [matchmaker_redis]
- sentinel_hosts=host1:26379, host2:26379, host3:26379
+ [DEFAULT]
+ transport_url = "zmq+redis://host1:26379,host2:26379,host3:26379"
Restrict the number of TCP sockets on controller
@@ -174,7 +165,7 @@ controller node in directly connected configuration. To solve the issue
ROUTER proxy may be used.
In order to configure driver to use ROUTER proxy set up the 'use_router_proxy'
-option to true in [DEFAULT] section (false is set by default).
+option to true in [oslo_messaging_zmq] section (false is set by default).
For example::
@@ -198,7 +189,7 @@ direct DEALER/ROUTER unicast which is possible but less efficient and therefore
is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not
needed.
-This option can be set in [DEFAULT] section.
+This option can be set in [oslo_messaging_zmq] section.
For example::
@@ -218,7 +209,7 @@ All services bind to an IP address or Ethernet adapter. By default, all services
bind to '*', effectively binding to 0.0.0.0. This may be changed with the option
'rpc_zmq_bind_address' which accepts a wildcard, IP address, or Ethernet adapter.
-This configuration can be set in [DEFAULT] section.
+This configuration can be set in [oslo_messaging_zmq] section.
For example::
diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py
index a76b0b5..3126a41 100644
--- a/oslo_messaging/_cmd/zmq_proxy.py
+++ b/oslo_messaging/_cmd/zmq_proxy.py
@@ -17,12 +17,13 @@ import logging
from oslo_config import cfg
-from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
+from oslo_messaging._drivers.zmq_driver import zmq_options
CONF = cfg.CONF
-CONF.register_opts(impl_zmq.zmq_opts)
+
+zmq_options.register_opts(CONF)
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
title='ZeroMQ proxy options')
diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
index 7ad0744..7ad0744 100755..100644
--- a/oslo_messaging/_drivers/impl_pika.py
+++ b/oslo_messaging/_drivers/impl_pika.py
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 5636d01..90c2c20 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -14,10 +14,8 @@
import logging
import os
-import socket
import threading
-from oslo_config import cfg
from stevedore import driver
from oslo_messaging._drivers import base
@@ -25,90 +23,14 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_client
from oslo_messaging._drivers.zmq_driver.server import zmq_server
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LE
-from oslo_messaging import server
RPCException = rpc_common.RPCException
-_MATCHMAKER_BACKENDS = ('redis', 'dummy')
-_MATCHMAKER_DEFAULT = 'redis'
LOG = logging.getLogger(__name__)
-zmq_opts = [
- cfg.StrOpt('rpc_zmq_bind_address', default='*',
- help='ZeroMQ bind address. Should be a wildcard (*), '
- 'an ethernet interface, or IP. '
- 'The "host" option should point or resolve to this '
- 'address.'),
-
- cfg.StrOpt('rpc_zmq_matchmaker', default=_MATCHMAKER_DEFAULT,
- choices=_MATCHMAKER_BACKENDS,
- help='MatchMaker driver.'),
-
- cfg.IntOpt('rpc_zmq_contexts', default=1,
- help='Number of ZeroMQ contexts, defaults to 1.'),
-
- cfg.IntOpt('rpc_zmq_topic_backlog',
- help='Maximum number of ingress messages to locally buffer '
- 'per topic. Default is unlimited.'),
-
- cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
- help='Directory for holding IPC sockets.'),
-
- cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
- sample_default='localhost',
- help='Name of this node. Must be a valid hostname, FQDN, or '
- 'IP address. Must match "host" option, if running Nova.'),
-
- cfg.IntOpt('rpc_cast_timeout', default=-1,
- help='Seconds to wait before a cast expires (TTL). '
- 'The default value of -1 specifies an infinite linger '
- 'period. The value of 0 specifies no linger period. '
- 'Pending messages shall be discarded immediately '
- 'when the socket is closed. Only supported by impl_zmq.'),
-
- cfg.IntOpt('rpc_poll_timeout', default=1,
- help='The default number of seconds that poll should wait. '
- 'Poll raises timeout exception when timeout expired.'),
-
- cfg.IntOpt('zmq_target_expire', default=300,
- help='Expiration timeout in seconds of a name service record '
- 'about existing target ( < 0 means no timeout).'),
-
- cfg.IntOpt('zmq_target_update', default=180,
- help='Update period in seconds of a name service record '
- 'about existing target.'),
-
- cfg.BoolOpt('use_pub_sub', default=True,
- help='Use PUB/SUB pattern for fanout methods. '
- 'PUB/SUB always uses proxy.'),
-
- cfg.BoolOpt('use_router_proxy', default=True,
- help='Use ROUTER remote proxy.'),
-
- cfg.PortOpt('rpc_zmq_min_port',
- default=49153,
- help='Minimal port number for random ports range.'),
-
- cfg.IntOpt('rpc_zmq_max_port',
- min=1,
- max=65536,
- default=65536,
- help='Maximal port number for random ports range.'),
-
- cfg.IntOpt('rpc_zmq_bind_port_retries',
- default=100,
- help='Number of retries to find free port number before '
- 'fail with ZMQBindError.'),
-
- cfg.StrOpt('rpc_zmq_serialization', default='json',
- choices=('json', 'msgpack'),
- help='Default serialization mechanism for '
- 'serializing/deserializing outgoing/incoming messages')
-]
-
-
class LazyDriverItem(object):
def __init__(self, item_cls, *args, **kwargs):
@@ -174,9 +96,7 @@ class ZmqDriver(base.BaseDriver):
if zmq is None:
raise ImportError(_LE("ZeroMQ is not available!"))
- conf.register_opts(zmq_opts)
- conf.register_opts(server._pool_opts)
- conf.register_opts(base.base_opts)
+ zmq_options.register_opts(conf)
self.conf = conf
self.allowed_remote_exmods = allowed_remote_exmods
@@ -186,9 +106,11 @@ class ZmqDriver(base.BaseDriver):
).driver(self.conf, url=url)
client_cls = zmq_client.ZmqClientProxy
- if conf.use_pub_sub and not conf.use_router_proxy:
+ if conf.oslo_messaging_zmq.use_pub_sub and not \
+ conf.oslo_messaging_zmq.use_router_proxy:
client_cls = zmq_client.ZmqClientMixDirectPubSub
- elif not conf.use_pub_sub and not conf.use_router_proxy:
+ elif not conf.oslo_messaging_zmq.use_pub_sub and not \
+ conf.oslo_messaging_zmq.use_router_proxy:
client_cls = zmq_client.ZmqClientDirect
self.client = LazyDriverItem(
@@ -206,13 +128,13 @@ class ZmqDriver(base.BaseDriver):
zmq_transport, p, matchmaker_backend = url.transport.partition('+')
assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
if not matchmaker_backend:
- return self.conf.rpc_zmq_matchmaker
- elif matchmaker_backend not in _MATCHMAKER_BACKENDS:
+ return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker
+ elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
raise rpc_common.RPCException(
_LE("Incorrect matchmaker backend name %(backend_name)s!"
"Available names are: %(available_names)s") %
{"backend_name": matchmaker_backend,
- "available_names": _MATCHMAKER_BACKENDS})
+ "available_names": zmq_options.MATCHMAKER_BACKENDS})
return matchmaker_backend
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index 29dd3fc..fb10ce7 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -63,7 +63,7 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
else:
return \
[zmq_address.target_to_subscribe_filter(request.target)] \
- if self.conf.use_pub_sub else \
+ if self.conf.oslo_messaging_zmq.use_pub_sub else \
self.routing_table.get_all_hosts(request.target)
except retrying.RetryError:
return []
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
index e7362e2..0ec27e9 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
@@ -39,7 +39,8 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if conf.use_router_proxy or not conf.use_pub_sub:
+ if conf.oslo_messaging_zmq.use_router_proxy or not \
+ conf.oslo_messaging_zmq.use_pub_sub:
raise WrongClientException()
publisher_direct = \
@@ -68,7 +69,8 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if conf.use_pub_sub or conf.use_router_proxy:
+ if conf.oslo_messaging_zmq.use_pub_sub or \
+ conf.oslo_messaging_zmq.use_router_proxy:
raise WrongClientException()
publisher = \
@@ -92,7 +94,7 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if not conf.use_router_proxy:
+ if not conf.oslo_messaging_zmq.use_router_proxy:
raise WrongClientException()
publisher = \
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
index 63c683f..96ebead 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
@@ -87,7 +87,8 @@ class ReceiverBase(object):
return self._requests.pop((message_id, message_type), None)
def _run_loop(self):
- data, socket = self._poller.poll(timeout=self.conf.rpc_poll_timeout)
+ data, socket = self._poller.poll(
+ timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout)
if data is None:
return
reply_id, message_type, message_id, response = data
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
index 2abb21b..16de0bc 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
@@ -46,7 +46,8 @@ class RoutingTable(object):
return host
def _is_tm_expired(self, tm):
- return 0 <= self.conf.zmq_target_expire <= time.time() - tm
+ return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
+ <= time.time() - tm
def _update_routing_table(self, target):
routing_record = self.routing_table.get(str(target))
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
index 890d5a1..aa82b84 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
@@ -57,7 +57,8 @@ class SocketsManager(object):
def _check_for_new_hosts(self, target):
key = self._key_from_target(target)
socket, tm = self.outbound_sockets[key]
- if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
+ if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
+ <= time.time() - tm:
self._get_hosts_and_connect(socket, target)
return socket
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
index b35a7f9..15c7774 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
@@ -85,7 +85,7 @@ class ZmqProxy(object):
self.conf = conf
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
- self.conf.rpc_zmq_matchmaker,
+ self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
).driver(self.conf)
self.context = zmq.Context()
self.proxy = proxy_cls(conf, self.context, self.matchmaker)
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
index 39de566..4c747ab 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
@@ -68,8 +68,9 @@ class UniversalQueueProxy(object):
return
msg_type = message[0]
- if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE,
- zmq_names.NOTIFY_TYPE):
+ if self.conf.oslo_messaging_zmq.use_pub_sub and \
+ msg_type in (zmq_names.CAST_FANOUT_TYPE,
+ zmq_names.NOTIFY_TYPE):
self.pub_publisher.send_request(message)
else:
self._redirect_message(self.be_router_socket.handle
@@ -133,12 +134,13 @@ class RouterUpdater(zmq_updater.UpdaterBase):
def _update_records(self):
self.matchmaker.register_publisher(
(self.publisher_address, self.fe_router_address),
- expire=self.conf.zmq_target_expire)
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
{"pub": self.publisher_address,
"router": self.fe_router_address})
- self.matchmaker.register_router(self.be_router_address,
- expire=self.conf.zmq_target_expire)
+ self.matchmaker.register_router(
+ self.be_router_address,
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
{"router": self.be_router_address})
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
index d413a98..69a7077 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
@@ -79,8 +79,8 @@ class SingleSocketConsumer(ConsumerBase):
{"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address,
"port": socket.port})
- self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
- socket.port)
+ self.host = zmq_address.combine_address(
+ self.conf.oslo_messaging_zmq.rpc_zmq_host, socket.port)
self.poller.register(socket, self.receive_message)
return socket
except zmq.ZMQError as e:
@@ -119,7 +119,7 @@ class TargetUpdater(zmq_updater.UpdaterBase):
self.matchmaker.register(
self.target, self.host,
zmq_names.socket_type_str(self.socket_type),
- expire=self.conf.zmq_target_expire)
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
def stop(self):
super(TargetUpdater, self).stop()
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
index fa7b0bc..b40bdc0 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
@@ -41,11 +41,14 @@ class ZmqServer(base.PollStyleListener):
self.poller = poller or zmq_async.get_poller()
self.router_consumer = zmq_router_consumer.RouterConsumer(
- conf, self.poller, self) if not conf.use_router_proxy else None
+ conf, self.poller, self) \
+ if not conf.oslo_messaging_zmq.use_router_proxy else None
self.dealer_consumer = zmq_dealer_consumer.DealerConsumer(
- conf, self.poller, self) if conf.use_router_proxy else None
+ conf, self.poller, self) \
+ if conf.oslo_messaging_zmq.use_router_proxy else None
self.sub_consumer = zmq_sub_consumer.SubConsumer(
- conf, self.poller, self) if conf.use_pub_sub else None
+ conf, self.poller, self) \
+ if conf.oslo_messaging_zmq.use_pub_sub else None
self.consumers = []
if self.router_consumer is not None:
@@ -58,7 +61,7 @@ class ZmqServer(base.PollStyleListener):
@base.batch_poll_helper
def poll(self, timeout=None):
message, socket = self.poller.poll(
- timeout or self.conf.rpc_poll_timeout)
+ timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
return message
def stop(self):
@@ -94,7 +97,7 @@ class ZmqNotificationServer(base.PollStyleListener):
@base.batch_poll_helper
def poll(self, timeout=None):
message, socket = self.poller.poll(
- timeout or self.conf.rpc_poll_timeout)
+ timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
return message
def stop(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
index b33c288..0175e7e 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
@@ -24,11 +24,11 @@ def get_tcp_direct_address(host):
def get_tcp_random_address(conf):
- return "tcp://%s" % conf.rpc_zmq_bind_address
+ return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address
def get_broker_address(conf):
- return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
+ return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir
def prefix_str(key, listener_type):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
new file mode 100644
index 0000000..2ac76f9
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
@@ -0,0 +1,122 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import socket
+
+from oslo_config import cfg
+
+from oslo_messaging._drivers import base
+from oslo_messaging import server
+
+
+MATCHMAKER_BACKENDS = ('redis', 'dummy')
+MATCHMAKER_DEFAULT = 'redis'
+
+
+zmq_opts = [
+ cfg.StrOpt('rpc_zmq_bind_address', default='*',
+ deprecated_group='DEFAULT',
+ help='ZeroMQ bind address. Should be a wildcard (*), '
+ 'an ethernet interface, or IP. '
+ 'The "host" option should point or resolve to this '
+ 'address.'),
+
+ cfg.StrOpt('rpc_zmq_matchmaker', default=MATCHMAKER_DEFAULT,
+ choices=MATCHMAKER_BACKENDS,
+ deprecated_group='DEFAULT',
+ help='MatchMaker driver.'),
+
+ cfg.IntOpt('rpc_zmq_contexts', default=1,
+ deprecated_group='DEFAULT',
+ help='Number of ZeroMQ contexts, defaults to 1.'),
+
+ cfg.IntOpt('rpc_zmq_topic_backlog',
+ deprecated_group='DEFAULT',
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
+ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
+ deprecated_group='DEFAULT',
+ help='Directory for holding IPC sockets.'),
+
+ cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
+ sample_default='localhost',
+ deprecated_group='DEFAULT',
+ help='Name of this node. Must be a valid hostname, FQDN, or '
+ 'IP address. Must match "host" option, if running Nova.'),
+
+ cfg.IntOpt('rpc_cast_timeout', default=-1,
+ deprecated_group='DEFAULT',
+ help='Seconds to wait before a cast expires (TTL). '
+ 'The default value of -1 specifies an infinite linger '
+ 'period. The value of 0 specifies no linger period. '
+ 'Pending messages shall be discarded immediately '
+ 'when the socket is closed. Only supported by impl_zmq.'),
+
+ cfg.IntOpt('rpc_poll_timeout', default=1,
+ deprecated_group='DEFAULT',
+ help='The default number of seconds that poll should wait. '
+ 'Poll raises timeout exception when timeout expired.'),
+
+ cfg.IntOpt('zmq_target_expire', default=300,
+ deprecated_group='DEFAULT',
+ help='Expiration timeout in seconds of a name service record '
+ 'about existing target ( < 0 means no timeout).'),
+
+ cfg.IntOpt('zmq_target_update', default=180,
+ deprecated_group='DEFAULT',
+ help='Update period in seconds of a name service record '
+ 'about existing target.'),
+
+ cfg.BoolOpt('use_pub_sub', default=True,
+ deprecated_group='DEFAULT',
+ help='Use PUB/SUB pattern for fanout methods. '
+ 'PUB/SUB always uses proxy.'),
+
+ cfg.BoolOpt('use_router_proxy', default=True,
+ deprecated_group='DEFAULT',
+ help='Use ROUTER remote proxy.'),
+
+ cfg.PortOpt('rpc_zmq_min_port',
+ default=49153,
+ deprecated_group='DEFAULT',
+ help='Minimal port number for random ports range.'),
+
+ cfg.IntOpt('rpc_zmq_max_port',
+ min=1,
+ max=65536,
+ default=65536,
+ deprecated_group='DEFAULT',
+ help='Maximal port number for random ports range.'),
+
+ cfg.IntOpt('rpc_zmq_bind_port_retries',
+ default=100,
+ deprecated_group='DEFAULT',
+ help='Number of retries to find free port number before '
+ 'fail with ZMQBindError.'),
+
+ cfg.StrOpt('rpc_zmq_serialization', default='json',
+ choices=('json', 'msgpack'),
+ deprecated_group='DEFAULT',
+ help='Default serialization mechanism for '
+ 'serializing/deserializing outgoing/incoming messages')
+]
+
+
+def register_opts(conf):
+ opt_group = cfg.OptGroup(name='oslo_messaging_zmq',
+ title='ZeroMQ driver options')
+ conf.register_opts(zmq_opts, group=opt_group)
+ conf.register_opts(server._pool_opts)
+ conf.register_opts(base.base_opts)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index 14061b2..285eafa 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -47,8 +47,9 @@ class ZmqSocket(object):
self.handle.set_hwm(high_watermark)
self.close_linger = -1
- if self.conf.rpc_cast_timeout > 0:
- self.close_linger = self.conf.rpc_cast_timeout * 1000
+ if self.conf.oslo_messaging_zmq.rpc_cast_timeout > 0:
+ self.close_linger = \
+ self.conf.oslo_messaging_zmq.rpc_cast_timeout * 1000
self.handle.setsockopt(zmq.LINGER, self.close_linger)
# Put messages to only connected queues
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
@@ -96,8 +97,9 @@ class ZmqSocket(object):
self.handle.send_multipart(*args, **kwargs)
def send_dumped(self, obj, *args, **kwargs):
- serialization = kwargs.pop('serialization',
- self.conf.rpc_zmq_serialization)
+ serialization = kwargs.pop(
+ 'serialization',
+ self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
serializer = self._get_serializer(serialization)
s = serializer.dump_as_bytes(obj)
self.handle.send(s, *args, **kwargs)
@@ -118,8 +120,9 @@ class ZmqSocket(object):
return self.handle.recv_multipart(*args, **kwargs)
def recv_loaded(self, *args, **kwargs):
- serialization = kwargs.pop('serialization',
- self.conf.rpc_zmq_serialization)
+ serialization = kwargs.pop(
+ 'serialization',
+ self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
serializer = self._get_serializer(serialization)
s = self.handle.recv(*args, **kwargs)
obj = serializer.load_from_bytes(s)
@@ -170,13 +173,13 @@ class ZmqRandomPortSocket(ZmqSocket):
high_watermark=high_watermark)
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
if host is None:
- host = conf.rpc_zmq_host
+ host = conf.oslo_messaging_zmq.rpc_zmq_host
try:
self.port = self.handle.bind_to_random_port(
self.bind_address,
- min_port=conf.rpc_zmq_min_port,
- max_port=conf.rpc_zmq_max_port,
- max_tries=conf.rpc_zmq_bind_port_retries)
+ min_port=conf.oslo_messaging_zmq.rpc_zmq_min_port,
+ max_port=conf.oslo_messaging_zmq.rpc_zmq_max_port,
+ max_tries=conf.oslo_messaging_zmq.rpc_zmq_bind_port_retries)
self.connect_address = zmq_address.combine_address(host, self.port)
except zmq.ZMQBindError:
LOG.error(_LE("Random ports range exceeded!"))
@@ -192,7 +195,8 @@ class ZmqFixedPortSocket(ZmqSocket):
high_watermark=high_watermark)
self.connect_address = zmq_address.combine_address(host, port)
self.bind_address = zmq_address.get_tcp_direct_address(
- zmq_address.combine_address(conf.rpc_zmq_bind_address, port))
+ zmq_address.combine_address(
+ conf.oslo_messaging_zmq.rpc_zmq_bind_address, port))
self.host = host
self.port = port
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
index 302915d..2d4f9e0 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
@@ -41,7 +41,7 @@ class UpdaterBase(object):
def _update_loop(self):
self.update_method()
- time.sleep(self.conf.zmq_target_update)
+ time.sleep(self.conf.oslo_messaging_zmq.zmq_target_update)
def cleanup(self):
self.executor.stop()
diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py
index 4e6c9d5..5eb4e5e 100644
--- a/oslo_messaging/conffixture.py
+++ b/oslo_messaging/conffixture.py
@@ -58,7 +58,8 @@ class ConfFixture(fixtures.Fixture):
'oslo_messaging._drivers.amqp1_driver.opts',
'amqp1_opts', 'oslo_messaging_amqp')
_import_opts(self.conf,
- 'oslo_messaging._drivers.impl_zmq', 'zmq_opts')
+ 'oslo_messaging._drivers.zmq_driver.zmq_options',
+ 'zmq_opts', 'oslo_messaging_zmq')
_import_opts(self.conf,
'oslo_messaging._drivers.zmq_driver.'
'matchmaker.matchmaker_redis',
diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py
index b04768a..c252496 100644
--- a/oslo_messaging/opts.py
+++ b/oslo_messaging/opts.py
@@ -25,7 +25,7 @@ from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_pika
from oslo_messaging._drivers import impl_rabbit
-from oslo_messaging._drivers import impl_zmq
+from oslo_messaging._drivers.impl_zmq import zmq_options
from oslo_messaging._drivers.pika_driver import pika_connection_factory
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
from oslo_messaging.notify import notifier
@@ -36,7 +36,7 @@ from oslo_messaging import transport
_global_opt_lists = [
drivers_base.base_opts,
- impl_zmq.zmq_opts,
+ zmq_options.zmq_opts,
server._pool_opts,
client._client_opts,
transport._transport_opts,
@@ -45,6 +45,7 @@ _global_opt_lists = [
_opts = [
(None, list(itertools.chain(*_global_opt_lists))),
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
+ ('oslo_messaging_zmq', zmq_options.zmq_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(
diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
index 76b61cf..04d86d9 100644
--- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
@@ -35,7 +35,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
# Set config values
kwargs = {'rpc_zmq_min_port': 5555,
'rpc_zmq_max_port': 5560}
- self.config(**kwargs)
+ self.config(group='oslo_messaging_zmq', **kwargs)
def test_ports_range(self):
listeners = []
diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
index 02519de..2973521 100644
--- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
+++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
@@ -54,7 +54,7 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
kwargs = {'use_pub_sub': True,
'rpc_zmq_serialization': self.serialization}
- self.config(**kwargs)
+ self.config(group='oslo_messaging_zmq', **kwargs)
self.config(host="127.0.0.1", group="zmq_proxy_opts")
self.config(publisher_port="0", group="zmq_proxy_opts")
diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py
index f0ef4a4..ff48bfb 100644
--- a/oslo_messaging/tests/drivers/zmq/zmq_common.py
+++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py
@@ -20,6 +20,7 @@ import testtools
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LE
from oslo_messaging.tests import utils as test_utils
@@ -71,17 +72,18 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
+ zmq_options.register_opts(self.conf)
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'use_pub_sub': False,
'use_router_proxy': False,
'rpc_zmq_matchmaker': 'dummy'}
- self.config(**kwargs)
+ self.config(group='oslo_messaging_zmq', **kwargs)
+ self.config(rpc_response_timeout=5)
# Get driver
transport = oslo_messaging.get_transport(self.conf)
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index 0dcc047..1d215a5 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -293,10 +293,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER')
if zmq_matchmaker:
- self.config(rpc_zmq_matchmaker=zmq_matchmaker)
+ self.config(rpc_zmq_matchmaker=zmq_matchmaker,
+ group="oslo_messaging_zmq")
zmq_ipc_dir = os.environ.get('ZMQ_IPC_DIR')
if zmq_ipc_dir:
- self.config(rpc_zmq_ipc_dir=zmq_ipc_dir)
+ self.config(group="oslo_messaging_zmq",
+ rpc_zmq_ipc_dir=zmq_ipc_dir)
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
if zmq_redis_port:
self.config(port=zmq_redis_port, group="matchmaker_redis")
@@ -304,10 +306,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
self.config(wait_timeout=1000, group="matchmaker_redis")
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
if zmq_use_pub_sub:
- self.config(use_pub_sub=zmq_use_pub_sub)
+ self.config(use_pub_sub=zmq_use_pub_sub,
+ group='oslo_messaging_zmq')
zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
if zmq_use_router_proxy:
- self.config(use_router_proxy=zmq_use_router_proxy)
+ self.config(use_router_proxy=zmq_use_router_proxy,
+ group='oslo_messaging_zmq')
class NotificationFixture(fixtures.Fixture):
diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py
index ee9f56e..4a1498a 100644
--- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py
+++ b/oslo_messaging/tests/functional/zmq/multiproc_utils.py
@@ -70,7 +70,8 @@ def listener_configurer(conf):
'%(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
- log_path = conf.rpc_zmq_ipc_dir + "/" + "zmq_multiproc.log"
+ log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \
+ "/" + "zmq_multiproc.log"
file_handler = logging.StreamHandler(open(log_path, 'w'))
file_handler.setFormatter(f)
root.addHandler(file_handler)
diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py
index ea287b3..f1b89b0 100644
--- a/oslo_messaging/tests/functional/zmq/test_startup.py
+++ b/oslo_messaging/tests/functional/zmq/test_startup.py
@@ -30,10 +30,10 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase):
self.conf.prog = "test_prog"
self.conf.project = "test_project"
- kwargs = {'rpc_response_timeout': 30}
- self.config(**kwargs)
+ self.config(rpc_response_timeout=30)
- log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"
+ log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
+ str(os.getpid()) + ".log")
sys.stdout = open(log_path, "w", buffering=0)
def test_call_server_before_client(self):
diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py
index 2ca8f8a..0e4b1f8 100644
--- a/oslo_messaging/tests/test_opts.py
+++ b/oslo_messaging/tests/test_opts.py
@@ -32,11 +32,12 @@ class OptsTestCase(test_utils.BaseTestCase):
super(OptsTestCase, self).setUp()
def _test_list_opts(self, result):
- self.assertEqual(5, len(result))
+ self.assertEqual(6, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('matchmaker_redis', groups)
+ self.assertIn('oslo_messaging_zmq', groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_notifications', groups)
self.assertIn('oslo_messaging_rabbit', groups)
diff --git a/setup-test-env-zmq-proxy.sh b/setup-test-env-zmq-proxy.sh
index ebce12c..12649c8 100755
--- a/setup-test-env-zmq-proxy.sh
+++ b/setup-test-env-zmq-proxy.sh
@@ -18,6 +18,7 @@ export ZMQ_PROXY_HOST=127.0.0.1
cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT]
transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
use_pub_sub=${ZMQ_USE_PUB_SUB}
diff --git a/setup-test-env-zmq-pub-sub.sh b/setup-test-env-zmq-pub-sub.sh
index 4a937ba..5551be5 100755
--- a/setup-test-env-zmq-pub-sub.sh
+++ b/setup-test-env-zmq-pub-sub.sh
@@ -16,6 +16,7 @@ export ZMQ_USE_ROUTER_PROXY=true
cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT]
transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
use_pub_sub=${ZMQ_USE_PUB_SUB}
diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh
index 0fa3fcd..8780872 100755
--- a/setup-test-env-zmq.sh
+++ b/setup-test-env-zmq.sh
@@ -16,6 +16,7 @@ export ZMQ_USE_ROUTER_PROXY=false
cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT]
transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
use_pub_sub=${ZMQ_USE_PUB_SUB}
diff --git a/tools/simulator.py b/tools/simulator.py
index deaae92..2f3161b 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -684,7 +684,7 @@ def main():
if args.mode == 'rpc-server':
target = messaging.Target(topic=args.topic, server=args.server)
if args.url.startswith('zmq'):
- cfg.CONF.rpc_zmq_matchmaker = "redis"
+ cfg.CONF.oslo_messaging_zmq.rpc_zmq_matchmaker = "redis"
endpoint = rpc_server(TRANSPORT, target, args.wait_before_answer,
args.executor, args.duration)