summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-09-30 16:11:29 +0000
committerGerrit Code Review <review@openstack.org>2015-09-30 16:11:29 +0000
commit3a5db723aa0613a99df6d95eecf6d8cecf2ca79d (patch)
tree7bc5b2538fafbd62211e262e5b37c9aa34709c30
parent8b52367c53897aafba71e47cde9d2e4c717801d6 (diff)
parent3067dbd1984c984108707ab27d2694f80db9f1d4 (diff)
downloadoslo-messaging-3a5db723aa0613a99df6d95eecf6d8cecf2ca79d.tar.gz
Merge "Non-blocking outgoing queue was implemented"
-rw-r--r--oslo_messaging/_cmd/__init__.py1
-rw-r--r--oslo_messaging/_cmd/zmq_broker.py42
-rw-r--r--oslo_messaging/_drivers/impl_zmq.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/__init__.py0
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py53
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py82
-rw-r--r--oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py78
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py28
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py23
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py4
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_client.py11
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_request.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py2
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py27
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_address.py6
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_names.py1
-rw-r--r--oslo_messaging/tests/drivers/zmq/test_impl_zmq.py1
-rwxr-xr-xsetup-test-env-zmq.sh2
-rw-r--r--setup.cfg2
19 files changed, 332 insertions, 37 deletions
diff --git a/oslo_messaging/_cmd/__init__.py b/oslo_messaging/_cmd/__init__.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/oslo_messaging/_cmd/__init__.py
@@ -0,0 +1 @@
+
diff --git a/oslo_messaging/_cmd/zmq_broker.py b/oslo_messaging/_cmd/zmq_broker.py
new file mode 100644
index 0000000..8b42205
--- /dev/null
+++ b/oslo_messaging/_cmd/zmq_broker.py
@@ -0,0 +1,42 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import logging
+import sys
+
+from oslo_config import cfg
+
+from oslo_messaging._drivers import impl_zmq
+from oslo_messaging._drivers.zmq_driver.broker import zmq_broker
+from oslo_messaging._executors import impl_pooledexecutor
+
+CONF = cfg.CONF
+CONF.register_opts(impl_zmq.zmq_opts)
+CONF.register_opts(impl_pooledexecutor._pool_opts)
+# TODO(ozamiatin): Move this option assignment to an external config file
+# Use efficient zmq poller in real-world deployment
+CONF.rpc_zmq_native = True
+
+
+def main():
+ CONF(sys.argv[1:], project='oslo')
+ logging.basicConfig(level=logging.DEBUG)
+
+ with contextlib.closing(zmq_broker.ZmqBroker(CONF)) as reactor:
+ reactor.start()
+ reactor.wait()
+
+if __name__ == "__main__":
+ main()
diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index d30dc28..0213bda 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -80,6 +80,10 @@ zmq_opts = [
default=1,
help='The default number of seconds that poll should wait. '
'Poll raises timeout exception when timeout expired.'),
+
+ cfg.BoolOpt('zmq_use_broker',
+ default=True,
+ help='Shows whether zmq-messaging uses broker or not.')
]
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/__init__.py b/oslo_messaging/_drivers/zmq_driver/broker/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/__init__.py
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
new file mode 100644
index 0000000..c309474
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_base_proxy.py
@@ -0,0 +1,53 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+
+LOG = logging.getLogger(__name__)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class BaseProxy(object):
+
+ """Base TCP-proxy.
+
+ TCP-proxy redirects messages received by TCP from clients to servers
+ over IPC. Consists of TCP-frontend and IPC-backend objects. Runs
+ in async executor.
+ """
+
+ def __init__(self, conf, context):
+ super(BaseProxy, self).__init__()
+ self.conf = conf
+ self.context = context
+ self.executor = zmq_async.get_executor(self.run,
+ zmq_concurrency='native')
+
+ @abc.abstractmethod
+ def run(self):
+ """Main execution point of the proxy"""
+
+ def start(self):
+ self.executor.execute()
+
+ def stop(self):
+ self.executor.stop()
+
+ def wait(self):
+ self.executor.wait()
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py
new file mode 100644
index 0000000..163d3c1
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_broker.py
@@ -0,0 +1,82 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+
+from oslo_utils import excutils
+import six
+from stevedore import driver
+import zmq
+
+from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
+from oslo_messaging._i18n import _LE, _LI
+
+
+LOG = logging.getLogger(__name__)
+
+
+class ZmqBroker(object):
+ """Local messaging IPC broker (nodes are still peers).
+ The main purpose is to have native zeromq application.
+ Benefits of such approach are following:
+
+ 1. No risk to block the main thread of the process by unpatched
+ native parts of the libzmq (c-library is completely monkey-patch
+ unfriendly)
+ 2. Making use of standard zmq approaches as async pollers,
+ devices, queues etc.
+ 3. Possibility to implement queue persistence not touching existing
+ clients (staying in a separate process).
+ """
+
+ def __init__(self, conf):
+ super(ZmqBroker, self).__init__()
+ self.conf = conf
+ self._create_ipc_dirs()
+ self.matchmaker = driver.DriverManager(
+ 'oslo.messaging.zmq.matchmaker',
+ self.conf.rpc_zmq_matchmaker,
+ ).driver(self.conf)
+
+ self.context = zmq.Context()
+ self.queue = six.moves.queue.Queue()
+ self.proxies = [zmq_queue_proxy.OutgoingQueueProxy(
+ conf, self.context, self.queue, self.matchmaker),
+ zmq_queue_proxy.IncomingQueueProxy(
+ conf, self.context, self.queue)
+ ]
+
+ def _create_ipc_dirs(self):
+ ipc_dir = self.conf.rpc_zmq_ipc_dir
+ try:
+ os.makedirs("%s/fanout" % ipc_dir)
+ except os.error:
+ if not os.path.isdir(ipc_dir):
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Required IPC directory does not exist at"
+ " %s"), ipc_dir)
+
+ def start(self):
+ for proxy in self.proxies:
+ proxy.start()
+
+ def wait(self):
+ for proxy in self.proxies:
+ proxy.wait()
+
+ def close(self):
+ LOG.info(_LI("Broker shutting down ..."))
+ for proxy in self.proxies:
+ proxy.stop()
diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
new file mode 100644
index 0000000..c3c547f
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py
@@ -0,0 +1,78 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+import six
+import zmq
+
+from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
+from oslo_messaging._drivers.zmq_driver.client.publishers\
+ import zmq_dealer_publisher
+from oslo_messaging._drivers.zmq_driver import zmq_address
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._i18n import _LI
+
+LOG = logging.getLogger(__name__)
+
+
+class OutgoingQueueProxy(zmq_base_proxy.BaseProxy):
+
+ def __init__(self, conf, context, queue, matchmaker):
+ super(OutgoingQueueProxy, self).__init__(conf, context)
+ self.queue = queue
+ self.matchmaker = matchmaker
+ self.publisher = zmq_dealer_publisher.DealerPublisher(
+ conf, matchmaker)
+ LOG.info(_LI("Polling at outgoing proxy ..."))
+
+ def run(self):
+ try:
+ request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
+ LOG.info(_LI("Redirecting request %s to TCP publisher ...")
+ % request)
+ self.publisher.send_request(request)
+ except six.moves.queue.Empty:
+ return
+
+
+class IncomingQueueProxy(zmq_base_proxy.BaseProxy):
+
+ def __init__(self, conf, context, queue):
+ super(IncomingQueueProxy, self).__init__(conf, context)
+ self.poller = zmq_async.get_poller(
+ zmq_concurrency='native')
+
+ self.queue = queue
+
+ self.socket = context.socket(zmq.ROUTER)
+ self.socket.bind(zmq_address.get_broker_address(conf))
+ self.poller.register(self.socket, self.receive_request)
+ LOG.info(_LI("Polling at incoming proxy ..."))
+
+ def run(self):
+ request, socket = self.poller.poll(self.conf.rpc_poll_timeout)
+ if request is None:
+ return
+
+ LOG.info(_LI("Received request and queue it: %s") % str(request))
+
+ self.queue.put(request)
+
+ def receive_request(self, socket):
+ reply_id = socket.recv()
+ assert reply_id is not None, "Valid id expected"
+ empty = socket.recv()
+ assert empty == b'', "Empty delimiter expected"
+ return socket.recv_pyobj()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py
index a5c3f0f..2c8fc5e 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py
@@ -13,7 +13,6 @@
# under the License.
import logging
-import uuid
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
@@ -58,13 +57,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
def _send_request(self, socket, request):
- message_id = str(uuid.uuid1())
-
socket.send(b'', zmq.SNDMORE)
- socket.send_string(request.msg_type, zmq.SNDMORE)
- socket.send_string(message_id, zmq.SNDMORE)
- socket.send_pyobj(request.context, zmq.SNDMORE)
- socket.send_pyobj(request.message)
+ socket.send_pyobj(request)
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
% {"message": request.message,
@@ -75,6 +69,26 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
super(DealerPublisher, self).cleanup()
+class DealerPublisherLight(zmq_publisher_base.PublisherBase):
+
+ def __init__(self, conf, address):
+ super(DealerPublisherLight, self).__init__(conf)
+ self.socket = self.zmq_context.socket(zmq.DEALER)
+ self.socket.connect(address)
+
+ def send_request(self, request):
+
+ if request.msg_type == zmq_names.CALL_TYPE:
+ raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
+
+ self.socket.send(b'', zmq.SNDMORE)
+ self.socket.send_pyobj(request)
+
+ def cleanup(self):
+ self.socket.setsockopt(zmq.LINGER, 0)
+ self.socket.close()
+
+
class AcknowledgementReceiver(object):
def __init__(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
index 5ff1a41..faee64d 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
@@ -56,7 +56,7 @@ class PublisherBase(object):
Publisher can send request objects from zmq_request.
"""
- def __init__(self, conf, matchmaker):
+ def __init__(self, conf):
"""Construct publisher
@@ -65,13 +65,10 @@ class PublisherBase(object):
:param conf: configuration object
:type conf: oslo_config.CONF
- :param matchmaker: Name Service interface object
- :type matchmaker: matchmaker.MatchMakerBase
"""
self.conf = conf
self.zmq_context = zmq.Context()
- self.matchmaker = matchmaker
self.outbound_sockets = {}
super(PublisherBase, self).__init__()
@@ -92,9 +89,7 @@ class PublisherBase(object):
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
- socket.send_string(request.msg_type, zmq.SNDMORE)
- socket.send_pyobj(request.context, zmq.SNDMORE)
- socket.send_pyobj(request.message)
+ socket.send_pyobj(request)
def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
@@ -106,8 +101,19 @@ class PublisherBase(object):
class PublisherMultisend(PublisherBase):
def __init__(self, conf, matchmaker, socket_type):
+
+ """Construct publisher multi-send
+
+ Base class for fanout-sending publishers.
+
+ :param conf: configuration object
+ :type conf: oslo_config.CONF
+ :param matchmaker: Name Service interface object
+ :type matchmaker: matchmaker.MatchMakerBase
+ """
+ super(PublisherMultisend, self).__init__(conf)
self.socket_type = socket_type
- super(PublisherMultisend, self).__init__(conf, matchmaker)
+ self.matchmaker = matchmaker
def _check_hosts_connections(self, target):
# TODO(ozamiatin): Place for significant optimization
@@ -126,6 +132,7 @@ class PublisherMultisend(PublisherBase):
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
+ LOG.info(address)
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py
index 001fe02..ab17193 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py
@@ -31,6 +31,10 @@ zmq = zmq_async.import_zmq()
class ReqPublisher(zmq_publisher_base.PublisherBase):
+ def __init__(self, conf, matchmaker):
+ super(ReqPublisher, self).__init__(conf)
+ self.matchmaker = matchmaker
+
def send_request(self, request):
if request.msg_type != zmq_names.CALL_TYPE:
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
index 26a358f..3e7888d 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
@@ -19,6 +19,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_req_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_request
+from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
@@ -31,8 +32,14 @@ class ZmqClient(object):
self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
- self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
- conf, matchmaker)
+
+ self.dealer_publisher = None
+ if self.conf.zmq_use_broker:
+ self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight(
+ conf, zmq_address.get_broker_address(self.conf))
+ else:
+ self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
+ conf, matchmaker)
def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest(
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
index e692a3a..92d444a 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py
@@ -14,6 +14,7 @@
import abc
import logging
+import uuid
import six
@@ -61,6 +62,7 @@ class Request(object):
self.context = context
self.message = message
self.retry = retry
+ self.message_id = str(uuid.uuid1())
@abc.abstractproperty
def msg_type(self):
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
index 7719310..0c2beea 100644
--- a/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
+++ b/oslo_messaging/_drivers/zmq_driver/poller/threading_poller.py
@@ -41,7 +41,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
self.poller.register(socket, zmq.POLLIN)
def poll(self, timeout=None):
- timeout = timeout * 1000 # zmq poller waits milliseconds
+ timeout *= 1000 # zmq poller waits milliseconds
sockets = None
try:
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
index bdac859..f601660 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
@@ -81,29 +81,22 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
- msg_type = socket.recv_string()
- assert msg_type is not None, 'Bad format: msg type expected'
+ request = socket.recv_pyobj()
- msg_id = None
- if msg_type != zmq_names.CALL_TYPE:
- msg_id = socket.recv_string()
-
- context = socket.recv_pyobj()
- message = socket.recv_pyobj()
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
- % {"msg_type": msg_type,
- "msg": str(message)})
+ % {"msg_type": request.msg_type,
+ "msg": str(request.message)})
- if msg_type == zmq_names.CALL_TYPE:
+ if request.msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingRequest(
- self.server, context, message, socket, reply_id,
- self.poller)
- elif msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
+ self.server, request.context, request.message, socket,
+ reply_id, self.poller)
+ elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
return RouterIncomingMessage(
- self.server, context, message, socket, reply_id,
- msg_id, self.poller)
+ self.server, request.context, request.message, socket,
+ reply_id, request.message_id, self.poller)
else:
- LOG.error(_LE("Unknown message type: %s") % msg_type)
+ LOG.error(_LE("Unknown message type: %s") % request.msg_type)
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e))
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
index 7feb05d..e8c4829 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py
@@ -18,8 +18,12 @@ def combine_address(host, port):
def get_tcp_direct_address(host):
- return "tcp://%s" % (host)
+ return "tcp://%s" % str(host)
def get_tcp_random_address(conf):
return "tcp://*"
+
+
+def get_broker_address(conf):
+ return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
index 1c3c334..0f3112e 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
@@ -47,6 +47,7 @@ MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE)
DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE)
CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE)
NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE)
+NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES
def socket_type_str(socket_type):
diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
index 21641dd..c400075 100644
--- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
+++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py
@@ -77,6 +77,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
+ 'zmq_use_broker': False,
'rpc_zmq_matchmaker': 'dummy'}
self.config(**kwargs)
diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh
index 353c260..effe009 100755
--- a/setup-test-env-zmq.sh
+++ b/setup-test-env-zmq.sh
@@ -22,4 +22,6 @@ EOF
redis-server --port $ZMQ_REDIS_PORT &
+oslo-messaging-zmq-broker --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-broker.log 2>&1 &
+
$*
diff --git a/setup.cfg b/setup.cfg
index 8932415..ee63dc5 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,7 +22,7 @@ packages =
[entry_points]
console_scripts =
- oslo-messaging-zmq-receiver = oslo_messaging._cmd.zmq_receiver:main
+ oslo-messaging-zmq-broker = oslo_messaging._cmd.zmq_broker:main
oslo.messaging.drivers =
rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver