summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-08-05 19:28:50 +0000
committerGerrit Code Review <review@openstack.org>2016-08-05 19:28:50 +0000
commita0336c8aa19e027190623462e9190dec23531895 (patch)
treeb902930c0b0e2210861cca167a31e69f731c7717
parent51652c57d2b6fa040a0b88d20bafc0026253a516 (diff)
parent7c5d039fd355e60e099a0a36408c85a08bfcc2ad (diff)
downloadoslo-messaging-a0336c8aa19e027190623462e9190dec23531895.tar.gz
Merge "Move zmq driver options into its own group"5.7.0
-rw-r--r--doc/source/zmq_driver.rst43
-rw-r--r--oslo_messaging/_cmd/zmq_proxy.py5
-rw-r--r--[-rwxr-xr-x]oslo_messaging/_drivers/impl_pika.py0
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py96
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_client.py8
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py3
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py3
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py3
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py12
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/zmq_server.py13
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_address.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_options.py122
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_socket.py26
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_updater.py2
-rw-r--r--oslo_messaging/conffixture.py3
-rw-r--r--oslo_messaging/opts.py5
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_impl_zmq.py2
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_pub_sub.py2
-rw-r--r--oslo_messaging/tests/drivers/zmq/zmq_common.py6
-rw-r--r--oslo_messaging/tests/functional/utils.py12
-rw-r--r--oslo_messaging/tests/functional/zmq/multiproc_utils.py3
-rw-r--r--oslo_messaging/tests/functional/zmq/test_startup.py6
-rw-r--r--oslo_messaging/tests/test_opts.py3
-rwxr-xr-xsetup-test-env-zmq-proxy.sh1
-rwxr-xr-xsetup-test-env-zmq-pub-sub.sh1
-rwxr-xr-xsetup-test-env-zmq.sh1
-rwxr-xr-xtools/simulator.py2
30 files changed, 230 insertions, 167 deletions
diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst
index e73fdf9..bcc3d66 100644
--- a/doc/source/zmq_driver.rst
+++ b/doc/source/zmq_driver.rst
@@ -85,12 +85,14 @@ Configuration
Enabling (mandatory)
--------------------
-To enable the driver, in the section [DEFAULT] of the conf file,
-the 'rpc_backend' flag must be set to 'zmq' and the 'rpc_zmq_host' flag
+To enable the driver the 'transport_url' option must be set to 'zmq://'
+in the section [DEFAULT] of the conf file, the 'rpc_zmq_host' flag
must be set to the hostname of the current node. ::
[DEFAULT]
- rpc_backend = zmq
+ transport_url = "zmq://"
+
+ [oslo_messaging_zmq]
rpc_zmq_host = {hostname}
@@ -110,27 +112,17 @@ RedisMatchMaker: loads the hash table from a remote Redis server, supports
dynamic host/topic registrations, host expiration, and hooks for consuming
applications to acknowledge or neg-acknowledge topic.host service availability.
-To set the MatchMaker class, use option 'rpc_zmq_matchmaker' in [DEFAULT]. ::
-
- rpc_zmq_matchmaker = dummy
-
-or::
-
- rpc_zmq_matchmaker = redis
+For ZeroMQ driver Redis is configured in transport_url also. For using Redis
+specify the URL as follows::
-To specify the Redis server for RedisMatchMaker, use options in
-[matchmaker_redis] of each project. ::
-
- [matchmaker_redis]
- host = 127.0.0.1
- port = 6379
+ [DEFAULT]
+ transport_url = "zmq+redis://127.0.0.1:6379"
In order to cleanup redis storage from expired records (e.g. target listener
goes down) TTL may be applied for keys. Configure 'zmq_target_expire' option
which is 120 (seconds) by default. The option is related not specifically to
-redis so it is also defined in [DEFAULT] section. If option value is <= 0
-then keys don't expire and live forever in the storage.
-
+redis so it is also defined in [oslo_messaging_zmq] section. If option value
+is <= 0 then keys don't expire and live forever in the storage.
MatchMaker Data Source (mandatory)
----------------------------------
@@ -159,11 +151,10 @@ we use Sentinel solution and redis master-slave-slave configuration (if we have
3 controllers and run Redis on each of them).
To deploy redis with HA follow the `sentinel-install`_ instructions. From the
-messaging driver's side you will need to setup following configuration which
-is different from a single-node redis deployment ::
+messaging driver's side you will need to setup following configuration ::
- [matchmaker_redis]
- sentinel_hosts=host1:26379, host2:26379, host3:26379
+ [DEFAULT]
+ transport_url = "zmq+redis://host1:26379,host2:26379,host3:26379"
Restrict the number of TCP sockets on controller
@@ -174,7 +165,7 @@ controller node in directly connected configuration. To solve the issue
ROUTER proxy may be used.
In order to configure driver to use ROUTER proxy set up the 'use_router_proxy'
-option to true in [DEFAULT] section (false is set by default).
+option to true in [oslo_messaging_zmq] section (false is set by default).
For example::
@@ -198,7 +189,7 @@ direct DEALER/ROUTER unicast which is possible but less efficient and therefore
is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not
needed.
-This option can be set in [DEFAULT] section.
+This option can be set in [oslo_messaging_zmq] section.
For example::
@@ -218,7 +209,7 @@ All services bind to an IP address or Ethernet adapter. By default, all services
bind to '*', effectively binding to 0.0.0.0. This may be changed with the option
'rpc_zmq_bind_address' which accepts a wildcard, IP address, or Ethernet adapter.
-This configuration can be set in [DEFAULT] section.
+This configuration can be set in [oslo_messaging_zmq] section.
For example::
diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py
index a76b0b5..3126a41 100644
--- a/oslo_messaging/_cmd/zmq_proxy.py
+++ b/oslo_messaging/_cmd/zmq_proxy.py
@@ -17,12 +17,13 @@ import logging
from oslo_config import cfg
-from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy
+from oslo_messaging._drivers.zmq_driver import zmq_options
CONF = cfg.CONF
-CONF.register_opts(impl_zmq.zmq_opts)
+
+zmq_options.register_opts(CONF)
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
title='ZeroMQ proxy options')
diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py
index 7ad0744..7ad0744 100755..100644
--- a/oslo_messaging/_drivers/impl_pika.py
+++ b/oslo_messaging/_drivers/impl_pika.py
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index 5636d01..90c2c20 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -14,10 +14,8 @@
import logging
import os
-import socket
import threading
-from oslo_config import cfg
from stevedore import driver
from oslo_messaging._drivers import base
@@ -25,90 +23,14 @@ from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_client
from oslo_messaging._drivers.zmq_driver.server import zmq_server
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LE
-from oslo_messaging import server
RPCException = rpc_common.RPCException
-_MATCHMAKER_BACKENDS = ('redis', 'dummy')
-_MATCHMAKER_DEFAULT = 'redis'
LOG = logging.getLogger(__name__)
-zmq_opts = [
- cfg.StrOpt('rpc_zmq_bind_address', default='*',
- help='ZeroMQ bind address. Should be a wildcard (*), '
- 'an ethernet interface, or IP. '
- 'The "host" option should point or resolve to this '
- 'address.'),
-
- cfg.StrOpt('rpc_zmq_matchmaker', default=_MATCHMAKER_DEFAULT,
- choices=_MATCHMAKER_BACKENDS,
- help='MatchMaker driver.'),
-
- cfg.IntOpt('rpc_zmq_contexts', default=1,
- help='Number of ZeroMQ contexts, defaults to 1.'),
-
- cfg.IntOpt('rpc_zmq_topic_backlog',
- help='Maximum number of ingress messages to locally buffer '
- 'per topic. Default is unlimited.'),
-
- cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
- help='Directory for holding IPC sockets.'),
-
- cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
- sample_default='localhost',
- help='Name of this node. Must be a valid hostname, FQDN, or '
- 'IP address. Must match "host" option, if running Nova.'),
-
- cfg.IntOpt('rpc_cast_timeout', default=-1,
- help='Seconds to wait before a cast expires (TTL). '
- 'The default value of -1 specifies an infinite linger '
- 'period. The value of 0 specifies no linger period. '
- 'Pending messages shall be discarded immediately '
- 'when the socket is closed. Only supported by impl_zmq.'),
-
- cfg.IntOpt('rpc_poll_timeout', default=1,
- help='The default number of seconds that poll should wait. '
- 'Poll raises timeout exception when timeout expired.'),
-
- cfg.IntOpt('zmq_target_expire', default=300,
- help='Expiration timeout in seconds of a name service record '
- 'about existing target ( < 0 means no timeout).'),
-
- cfg.IntOpt('zmq_target_update', default=180,
- help='Update period in seconds of a name service record '
- 'about existing target.'),
-
- cfg.BoolOpt('use_pub_sub', default=True,
- help='Use PUB/SUB pattern for fanout methods. '
- 'PUB/SUB always uses proxy.'),
-
- cfg.BoolOpt('use_router_proxy', default=True,
- help='Use ROUTER remote proxy.'),
-
- cfg.PortOpt('rpc_zmq_min_port',
- default=49153,
- help='Minimal port number for random ports range.'),
-
- cfg.IntOpt('rpc_zmq_max_port',
- min=1,
- max=65536,
- default=65536,
- help='Maximal port number for random ports range.'),
-
- cfg.IntOpt('rpc_zmq_bind_port_retries',
- default=100,
- help='Number of retries to find free port number before '
- 'fail with ZMQBindError.'),
-
- cfg.StrOpt('rpc_zmq_serialization', default='json',
- choices=('json', 'msgpack'),
- help='Default serialization mechanism for '
- 'serializing/deserializing outgoing/incoming messages')
-]
-
-
class LazyDriverItem(object):
def __init__(self, item_cls, *args, **kwargs):
@@ -174,9 +96,7 @@ class ZmqDriver(base.BaseDriver):
if zmq is None:
raise ImportError(_LE("ZeroMQ is not available!"))
- conf.register_opts(zmq_opts)
- conf.register_opts(server._pool_opts)
- conf.register_opts(base.base_opts)
+ zmq_options.register_opts(conf)
self.conf = conf
self.allowed_remote_exmods = allowed_remote_exmods
@@ -186,9 +106,11 @@ class ZmqDriver(base.BaseDriver):
).driver(self.conf, url=url)
client_cls = zmq_client.ZmqClientProxy
- if conf.use_pub_sub and not conf.use_router_proxy:
+ if conf.oslo_messaging_zmq.use_pub_sub and not \
+ conf.oslo_messaging_zmq.use_router_proxy:
client_cls = zmq_client.ZmqClientMixDirectPubSub
- elif not conf.use_pub_sub and not conf.use_router_proxy:
+ elif not conf.oslo_messaging_zmq.use_pub_sub and not \
+ conf.oslo_messaging_zmq.use_router_proxy:
client_cls = zmq_client.ZmqClientDirect
self.client = LazyDriverItem(
@@ -206,13 +128,13 @@ class ZmqDriver(base.BaseDriver):
zmq_transport, p, matchmaker_backend = url.transport.partition('+')
assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
if not matchmaker_backend:
- return self.conf.rpc_zmq_matchmaker
- elif matchmaker_backend not in _MATCHMAKER_BACKENDS:
+ return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker
+ elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
raise rpc_common.RPCException(
_LE("Incorrect matchmaker backend name %(backend_name)s!"
"Available names are: %(available_names)s") %
{"backend_name": matchmaker_backend,
- "available_names": _MATCHMAKER_BACKENDS})
+ "available_names": zmq_options.MATCHMAKER_BACKENDS})
return matchmaker_backend
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index 29dd3fc..fb10ce7 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -63,7 +63,7 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
else:
return \
[zmq_address.target_to_subscribe_filter(request.target)] \
- if self.conf.use_pub_sub else \
+ if self.conf.oslo_messaging_zmq.use_pub_sub else \
self.routing_table.get_all_hosts(request.target)
except retrying.RetryError:
return []
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
index e7362e2..0ec27e9 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
@@ -39,7 +39,8 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if conf.use_router_proxy or not conf.use_pub_sub:
+ if conf.oslo_messaging_zmq.use_router_proxy or not \
+ conf.oslo_messaging_zmq.use_pub_sub:
raise WrongClientException()
publisher_direct = \
@@ -68,7 +69,8 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if conf.use_pub_sub or conf.use_router_proxy:
+ if conf.oslo_messaging_zmq.use_pub_sub or \
+ conf.oslo_messaging_zmq.use_router_proxy:
raise WrongClientException()
publisher = \
@@ -92,7 +94,7 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if not conf.use_router_proxy:
+ if not conf.oslo_messaging_zmq.use_router_proxy:
raise WrongClientException()
publisher = \
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
index 63c683f..96ebead 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
@@ -87,7 +87,8 @@ class ReceiverBase(object):
return self._requests.pop((message_id, message_type), None)
def _run_loop(self):
- data, socket = self._poller.poll(timeout=self.conf.rpc_poll_timeout)
+ data, socket = self._poller.poll(
+ timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout)
if data is None:
return
reply_id, message_type, message_id, response = data
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
index 2abb21b..16de0bc 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
@@ -46,7 +46,8 @@ class RoutingTable(object):
return host
def _is_tm_expired(self, tm):
- return 0 <= self.conf.zmq_target_expire <= time.time() - tm
+ return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
+ <= time.time() - tm
def _update_routing_table(self, target):
routing_record = self.routing_table.get(str(target))
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
index 890d5a1..aa82b84 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
@@ -57,7 +57,8 @@ class SocketsManager(object):
def _check_for_new_hosts(self, target):
key = self._key_from_target(target)
socket, tm = self.outbound_sockets[key]
- if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
+ if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \
+ <= time.time() - tm:
self._get_hosts_and_connect(socket, target)
return socket
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
index b35a7f9..15c7774 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py
@@ -85,7 +85,7 @@ class ZmqProxy(object):
self.conf = conf
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
- self.conf.rpc_zmq_matchmaker,
+ self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker,
).driver(self.conf)
self.context = zmq.Context()
self.proxy = proxy_cls(conf, self.context, self.matchmaker)
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
index 39de566..4c747ab 100644
--- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py
@@ -68,8 +68,9 @@ class UniversalQueueProxy(object):
return
msg_type = message[0]
- if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE,
- zmq_names.NOTIFY_TYPE):
+ if self.conf.oslo_messaging_zmq.use_pub_sub and \
+ msg_type in (zmq_names.CAST_FANOUT_TYPE,
+ zmq_names.NOTIFY_TYPE):
self.pub_publisher.send_request(message)
else:
self._redirect_message(self.be_router_socket.handle
@@ -133,12 +134,13 @@ class RouterUpdater(zmq_updater.UpdaterBase):
def _update_records(self):
self.matchmaker.register_publisher(
(self.publisher_address, self.fe_router_address),
- expire=self.conf.zmq_target_expire)
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
{"pub": self.publisher_address,
"router": self.fe_router_address})
- self.matchmaker.register_router(self.be_router_address,
- expire=self.conf.zmq_target_expire)
+ self.matchmaker.register_router(
+ self.be_router_address,
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
{"router": self.be_router_address})
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
index d413a98..69a7077 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py
@@ -79,8 +79,8 @@ class SingleSocketConsumer(ConsumerBase):
{"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address,
"port": socket.port})
- self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
- socket.port)
+ self.host = zmq_address.combine_address(
+ self.conf.oslo_messaging_zmq.rpc_zmq_host, socket.port)
self.poller.register(socket, self.receive_message)
return socket
except zmq.ZMQError as e:
@@ -119,7 +119,7 @@ class TargetUpdater(zmq_updater.UpdaterBase):
self.matchmaker.register(
self.target, self.host,
zmq_names.socket_type_str(self.socket_type),
- expire=self.conf.zmq_target_expire)
+ expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
def stop(self):
super(TargetUpdater, self).stop()
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
index fa7b0bc..b40bdc0 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py
@@ -41,11 +41,14 @@ class ZmqServer(base.PollStyleListener):
self.poller = poller or zmq_async.get_poller()
self.router_consumer = zmq_router_consumer.RouterConsumer(
- conf, self.poller, self) if not conf.use_router_proxy else None
+ conf, self.poller, self) \
+ if not conf.oslo_messaging_zmq.use_router_proxy else None
self.dealer_consumer = zmq_dealer_consumer.DealerConsumer(
- conf, self.poller, self) if conf.use_router_proxy else None
+ conf, self.poller, self) \
+ if conf.oslo_messaging_zmq.use_router_proxy else None
self.sub_consumer = zmq_sub_consumer.SubConsumer(
- conf, self.poller, self) if conf.use_pub_sub else None
+ conf, self.poller, self) \
+ if conf.oslo_messaging_zmq.use_pub_sub else None
self.consumers = []
if self.router_consumer is not None:
@@ -58,7 +61,7 @@ class ZmqServer(base.PollStyleListener):
@base.batch_poll_helper
def poll(self, timeout=None):
message, socket = self.poller.poll(
- timeout or self.conf.rpc_poll_timeout)
+ timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
return message
def stop(self):
@@ -94,7 +97,7 @@ class ZmqNotificationServer(base.PollStyleListener):
@base.batch_poll_helper
def poll(self, timeout=None):
message, socket = self.poller.poll(
- timeout or self.conf.rpc_poll_timeout)
+ timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
return message
def stop(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
index b33c288..0175e7e 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
@@ -24,11 +24,11 @@ def get_tcp_direct_address(host):
def get_tcp_random_address(conf):
- return "tcp://%s" % conf.rpc_zmq_bind_address
+ return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address
def get_broker_address(conf):
- return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
+ return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir
def prefix_str(key, listener_type):
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
new file mode 100644
index 0000000..2ac76f9
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py
@@ -0,0 +1,122 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import socket
+
+from oslo_config import cfg
+
+from oslo_messaging._drivers import base
+from oslo_messaging import server
+
+
+MATCHMAKER_BACKENDS = ('redis', 'dummy')
+MATCHMAKER_DEFAULT = 'redis'
+
+
+zmq_opts = [
+ cfg.StrOpt('rpc_zmq_bind_address', default='*',
+ deprecated_group='DEFAULT',
+ help='ZeroMQ bind address. Should be a wildcard (*), '
+ 'an ethernet interface, or IP. '
+ 'The "host" option should point or resolve to this '
+ 'address.'),
+
+ cfg.StrOpt('rpc_zmq_matchmaker', default=MATCHMAKER_DEFAULT,
+ choices=MATCHMAKER_BACKENDS,
+ deprecated_group='DEFAULT',
+ help='MatchMaker driver.'),
+
+ cfg.IntOpt('rpc_zmq_contexts', default=1,
+ deprecated_group='DEFAULT',
+ help='Number of ZeroMQ contexts, defaults to 1.'),
+
+ cfg.IntOpt('rpc_zmq_topic_backlog',
+ deprecated_group='DEFAULT',
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
+ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
+ deprecated_group='DEFAULT',
+ help='Directory for holding IPC sockets.'),
+
+ cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
+ sample_default='localhost',
+ deprecated_group='DEFAULT',
+ help='Name of this node. Must be a valid hostname, FQDN, or '
+ 'IP address. Must match "host" option, if running Nova.'),
+
+ cfg.IntOpt('rpc_cast_timeout', default=-1,
+ deprecated_group='DEFAULT',
+ help='Seconds to wait before a cast expires (TTL). '
+ 'The default value of -1 specifies an infinite linger '
+ 'period. The value of 0 specifies no linger period. '
+ 'Pending messages shall be discarded immediately '
+ 'when the socket is closed. Only supported by impl_zmq.'),
+
+ cfg.IntOpt('rpc_poll_timeout', default=1,
+ deprecated_group='DEFAULT',
+ help='The default number of seconds that poll should wait. '
+ 'Poll raises timeout exception when timeout expired.'),
+
+ cfg.IntOpt('zmq_target_expire', default=300,
+ deprecated_group='DEFAULT',
+ help='Expiration timeout in seconds of a name service record '
+ 'about existing target ( < 0 means no timeout).'),
+
+ cfg.IntOpt('zmq_target_update', default=180,
+ deprecated_group='DEFAULT',
+ help='Update period in seconds of a name service record '
+ 'about existing target.'),
+
+ cfg.BoolOpt('use_pub_sub', default=True,
+ deprecated_group='DEFAULT',
+ help='Use PUB/SUB pattern for fanout methods. '
+ 'PUB/SUB always uses proxy.'),
+
+ cfg.BoolOpt('use_router_proxy', default=True,
+ deprecated_group='DEFAULT',
+ help='Use ROUTER remote proxy.'),
+
+ cfg.PortOpt('rpc_zmq_min_port',
+ default=49153,
+ deprecated_group='DEFAULT',
+ help='Minimal port number for random ports range.'),
+
+ cfg.IntOpt('rpc_zmq_max_port',
+ min=1,
+ max=65536,
+ default=65536,
+ deprecated_group='DEFAULT',
+ help='Maximal port number for random ports range.'),
+
+ cfg.IntOpt('rpc_zmq_bind_port_retries',
+ default=100,
+ deprecated_group='DEFAULT',
+ help='Number of retries to find free port number before '
+ 'fail with ZMQBindError.'),
+
+ cfg.StrOpt('rpc_zmq_serialization', default='json',
+ choices=('json', 'msgpack'),
+ deprecated_group='DEFAULT',
+ help='Default serialization mechanism for '
+ 'serializing/deserializing outgoing/incoming messages')
+]
+
+
+def register_opts(conf):
+ opt_group = cfg.OptGroup(name='oslo_messaging_zmq',
+ title='ZeroMQ driver options')
+ conf.register_opts(zmq_opts, group=opt_group)
+ conf.register_opts(server._pool_opts)
+ conf.register_opts(base.base_opts)
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
index 14061b2..285eafa 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py
@@ -47,8 +47,9 @@ class ZmqSocket(object):
self.handle.set_hwm(high_watermark)
self.close_linger = -1
- if self.conf.rpc_cast_timeout > 0:
- self.close_linger = self.conf.rpc_cast_timeout * 1000
+ if self.conf.oslo_messaging_zmq.rpc_cast_timeout > 0:
+ self.close_linger = \
+ self.conf.oslo_messaging_zmq.rpc_cast_timeout * 1000
self.handle.setsockopt(zmq.LINGER, self.close_linger)
# Put messages to only connected queues
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
@@ -96,8 +97,9 @@ class ZmqSocket(object):
self.handle.send_multipart(*args, **kwargs)
def send_dumped(self, obj, *args, **kwargs):
- serialization = kwargs.pop('serialization',
- self.conf.rpc_zmq_serialization)
+ serialization = kwargs.pop(
+ 'serialization',
+ self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
serializer = self._get_serializer(serialization)
s = serializer.dump_as_bytes(obj)
self.handle.send(s, *args, **kwargs)
@@ -118,8 +120,9 @@ class ZmqSocket(object):
return self.handle.recv_multipart(*args, **kwargs)
def recv_loaded(self, *args, **kwargs):
- serialization = kwargs.pop('serialization',
- self.conf.rpc_zmq_serialization)
+ serialization = kwargs.pop(
+ 'serialization',
+ self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
serializer = self._get_serializer(serialization)
s = self.handle.recv(*args, **kwargs)
obj = serializer.load_from_bytes(s)
@@ -170,13 +173,13 @@ class ZmqRandomPortSocket(ZmqSocket):
high_watermark=high_watermark)
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
if host is None:
- host = conf.rpc_zmq_host
+ host = conf.oslo_messaging_zmq.rpc_zmq_host
try:
self.port = self.handle.bind_to_random_port(
self.bind_address,
- min_port=conf.rpc_zmq_min_port,
- max_port=conf.rpc_zmq_max_port,
- max_tries=conf.rpc_zmq_bind_port_retries)
+ min_port=conf.oslo_messaging_zmq.rpc_zmq_min_port,
+ max_port=conf.oslo_messaging_zmq.rpc_zmq_max_port,
+ max_tries=conf.oslo_messaging_zmq.rpc_zmq_bind_port_retries)
self.connect_address = zmq_address.combine_address(host, self.port)
except zmq.ZMQBindError:
LOG.error(_LE("Random ports range exceeded!"))
@@ -192,7 +195,8 @@ class ZmqFixedPortSocket(ZmqSocket):
high_watermark=high_watermark)
self.connect_address = zmq_address.combine_address(host, port)
self.bind_address = zmq_address.get_tcp_direct_address(
- zmq_address.combine_address(conf.rpc_zmq_bind_address, port))
+ zmq_address.combine_address(
+ conf.oslo_messaging_zmq.rpc_zmq_bind_address, port))
self.host = host
self.port = port
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
index 302915d..2d4f9e0 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
@@ -41,7 +41,7 @@ class UpdaterBase(object):
def _update_loop(self):
self.update_method()
- time.sleep(self.conf.zmq_target_update)
+ time.sleep(self.conf.oslo_messaging_zmq.zmq_target_update)
def cleanup(self):
self.executor.stop()
diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py
index 4e6c9d5..5eb4e5e 100644
--- a/oslo_messaging/conffixture.py
+++ b/oslo_messaging/conffixture.py
@@ -58,7 +58,8 @@ class ConfFixture(fixtures.Fixture):
'oslo_messaging._drivers.amqp1_driver.opts',
'amqp1_opts', 'oslo_messaging_amqp')
_import_opts(self.conf,
- 'oslo_messaging._drivers.impl_zmq', 'zmq_opts')
+ 'oslo_messaging._drivers.zmq_driver.zmq_options',
+ 'zmq_opts', 'oslo_messaging_zmq')
_import_opts(self.conf,
'oslo_messaging._drivers.zmq_driver.'
'matchmaker.matchmaker_redis',
diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py
index b04768a..c252496 100644
--- a/oslo_messaging/opts.py
+++ b/oslo_messaging/opts.py
@@ -25,7 +25,7 @@ from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_pika
from oslo_messaging._drivers import impl_rabbit
-from oslo_messaging._drivers import impl_zmq
+from oslo_messaging._drivers.impl_zmq import zmq_options
from oslo_messaging._drivers.pika_driver import pika_connection_factory
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
from oslo_messaging.notify import notifier
@@ -36,7 +36,7 @@ from oslo_messaging import transport
_global_opt_lists = [
drivers_base.base_opts,
- impl_zmq.zmq_opts,
+ zmq_options.zmq_opts,
server._pool_opts,
client._client_opts,
transport._transport_opts,
@@ -45,6 +45,7 @@ _global_opt_lists = [
_opts = [
(None, list(itertools.chain(*_global_opt_lists))),
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
+ ('oslo_messaging_zmq', zmq_options.zmq_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(
diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
index 76b61cf..04d86d9 100644
--- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
@@ -35,7 +35,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
# Set config values
kwargs = {'rpc_zmq_min_port': 5555,
'rpc_zmq_max_port': 5560}
- self.config(**kwargs)
+ self.config(group='oslo_messaging_zmq', **kwargs)
def test_ports_range(self):
listeners = []
diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
index 02519de..2973521 100644
--- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
+++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py
@@ -54,7 +54,7 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
kwargs = {'use_pub_sub': True,
'rpc_zmq_serialization': self.serialization}
- self.config(**kwargs)
+ self.config(group='oslo_messaging_zmq', **kwargs)
self.config(host="127.0.0.1", group="zmq_proxy_opts")
self.config(publisher_port="0", group="zmq_proxy_opts")
diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py
index f0ef4a4..ff48bfb 100644
--- a/oslo_messaging/tests/drivers/zmq/zmq_common.py
+++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py
@@ -20,6 +20,7 @@ import testtools
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LE
from oslo_messaging.tests import utils as test_utils
@@ -71,17 +72,18 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
+ zmq_options.register_opts(self.conf)
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
- 'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'use_pub_sub': False,
'use_router_proxy': False,
'rpc_zmq_matchmaker': 'dummy'}
- self.config(**kwargs)
+ self.config(group='oslo_messaging_zmq', **kwargs)
+ self.config(rpc_response_timeout=5)
# Get driver
transport = oslo_messaging.get_transport(self.conf)
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index 0dcc047..1d215a5 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -293,10 +293,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER')
if zmq_matchmaker:
- self.config(rpc_zmq_matchmaker=zmq_matchmaker)
+ self.config(rpc_zmq_matchmaker=zmq_matchmaker,
+ group="oslo_messaging_zmq")
zmq_ipc_dir = os.environ.get('ZMQ_IPC_DIR')
if zmq_ipc_dir:
- self.config(rpc_zmq_ipc_dir=zmq_ipc_dir)
+ self.config(group="oslo_messaging_zmq",
+ rpc_zmq_ipc_dir=zmq_ipc_dir)
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
if zmq_redis_port:
self.config(port=zmq_redis_port, group="matchmaker_redis")
@@ -304,10 +306,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
self.config(wait_timeout=1000, group="matchmaker_redis")
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
if zmq_use_pub_sub:
- self.config(use_pub_sub=zmq_use_pub_sub)
+ self.config(use_pub_sub=zmq_use_pub_sub,
+ group='oslo_messaging_zmq')
zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
if zmq_use_router_proxy:
- self.config(use_router_proxy=zmq_use_router_proxy)
+ self.config(use_router_proxy=zmq_use_router_proxy,
+ group='oslo_messaging_zmq')
class NotificationFixture(fixtures.Fixture):
diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py
index ee9f56e..4a1498a 100644
--- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py
+++ b/oslo_messaging/tests/functional/zmq/multiproc_utils.py
@@ -70,7 +70,8 @@ def listener_configurer(conf):
'%(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
- log_path = conf.rpc_zmq_ipc_dir + "/" + "zmq_multiproc.log"
+ log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \
+ "/" + "zmq_multiproc.log"
file_handler = logging.StreamHandler(open(log_path, 'w'))
file_handler.setFormatter(f)
root.addHandler(file_handler)
diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py
index ea287b3..f1b89b0 100644
--- a/oslo_messaging/tests/functional/zmq/test_startup.py
+++ b/oslo_messaging/tests/functional/zmq/test_startup.py
@@ -30,10 +30,10 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase):
self.conf.prog = "test_prog"
self.conf.project = "test_project"
- kwargs = {'rpc_response_timeout': 30}
- self.config(**kwargs)
+ self.config(rpc_response_timeout=30)
- log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log"
+ log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
+ str(os.getpid()) + ".log")
sys.stdout = open(log_path, "w", buffering=0)
def test_call_server_before_client(self):
diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py
index 2ca8f8a..0e4b1f8 100644
--- a/oslo_messaging/tests/test_opts.py
+++ b/oslo_messaging/tests/test_opts.py
@@ -32,11 +32,12 @@ class OptsTestCase(test_utils.BaseTestCase):
super(OptsTestCase, self).setUp()
def _test_list_opts(self, result):
- self.assertEqual(5, len(result))
+ self.assertEqual(6, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('matchmaker_redis', groups)
+ self.assertIn('oslo_messaging_zmq', groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_notifications', groups)
self.assertIn('oslo_messaging_rabbit', groups)
diff --git a/setup-test-env-zmq-proxy.sh b/setup-test-env-zmq-proxy.sh
index ebce12c..12649c8 100755
--- a/setup-test-env-zmq-proxy.sh
+++ b/setup-test-env-zmq-proxy.sh
@@ -18,6 +18,7 @@ export ZMQ_PROXY_HOST=127.0.0.1
cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT]
transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
use_pub_sub=${ZMQ_USE_PUB_SUB}
diff --git a/setup-test-env-zmq-pub-sub.sh b/setup-test-env-zmq-pub-sub.sh
index 4a937ba..5551be5 100755
--- a/setup-test-env-zmq-pub-sub.sh
+++ b/setup-test-env-zmq-pub-sub.sh
@@ -16,6 +16,7 @@ export ZMQ_USE_ROUTER_PROXY=true
cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT]
transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
use_pub_sub=${ZMQ_USE_PUB_SUB}
diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh
index 0fa3fcd..8780872 100755
--- a/setup-test-env-zmq.sh
+++ b/setup-test-env-zmq.sh
@@ -16,6 +16,7 @@ export ZMQ_USE_ROUTER_PROXY=false
cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT]
transport_url=${TRANSPORT_URL}
+[oslo_messaging_zmq]
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
use_pub_sub=${ZMQ_USE_PUB_SUB}
diff --git a/tools/simulator.py b/tools/simulator.py
index deaae92..2f3161b 100755
--- a/tools/simulator.py
+++ b/tools/simulator.py
@@ -684,7 +684,7 @@ def main():
if args.mode == 'rpc-server':
target = messaging.Target(topic=args.topic, server=args.server)
if args.url.startswith('zmq'):
- cfg.CONF.rpc_zmq_matchmaker = "redis"
+ cfg.CONF.oslo_messaging_zmq.rpc_zmq_matchmaker = "redis"
endpoint = rpc_server(TRANSPORT, target, args.wait_before_answer,
args.executor, args.duration)