diff options
76 files changed, 15 insertions, 6480 deletions
diff --git a/bindep.txt b/bindep.txt
index d9728bb..36f4ccd 100644
--- a/bindep.txt
+++ b/bindep.txt
@@ -14,17 +14,6 @@ libffi-devel [platform:rpm]
rabbitmq-server [platform:dpkg rabbit]
rabbitmq-server [platform:rpm rabbit]
-# zmq
-redis [platform:rpm zmq]
-redis-sentinel [platform:ubuntu !platform:ubuntu-trusty zmq]
-redis-server [platform:dpkg zmq]
-dev-db/redis [platform:gentoo zmq]
-python-redis [platform:dpkg zmq]
-zookeeperd [platform:dpkg zmq]
-python-zmq [!platform:gentoo !platform:fedora !platform:suse zmq]
-python2-zmq [platform:fedora zmq]
-dev-python/pyzmq [platform:gentoo zmq]
# AMQP1 dpkg
qpidd [platform:dpkg amqp1]
sasl2-bin [platform:dpkg amqp1]
diff --git a/doc/source/admin/AMQP1.0.rst b/doc/source/admin/AMQP1.0.rst
index 9ef3adf..e22d960 100644
--- a/doc/source/admin/AMQP1.0.rst
+++ b/doc/source/admin/AMQP1.0.rst
@@ -311,7 +311,7 @@ backends for RPC and Notify. The url is of the form:
Where the transport value specifies the rpc or notification backend as
-one of **amqp**, rabbit, zmq, etc.
+one of **amqp**, rabbit, kafka, etc.
To specify and enable the AMQP 1.0 driver for RPC, in the section
[DEFAULT] of the service configuration file, specify the
diff --git a/doc/source/admin/index.rst b/doc/source/admin/index.rst
index 63104bb..af5a87b 100644
--- a/doc/source/admin/index.rst
+++ b/doc/source/admin/index.rst
@@ -7,4 +7,3 @@ Deployment Guide
- zmq_driver
diff --git a/doc/source/admin/zmq_driver.rst b/doc/source/admin/zmq_driver.rst
deleted file mode 100644
index 641efc3..0000000
--- a/doc/source/admin/zmq_driver.rst
+++ /dev/null
@@ -1,608 +0,0 @@
-ZeroMQ Driver Deployment Guide
-.. currentmodule:: oslo_messaging
-**Note:** The ZeroMQ driver has been **deprecated** and is no longer
-maintained. Refer to the mailing list announcement for more
-.. _details:
-0MQ (also known as ZeroMQ or zmq) is embeddable networking library
-but acts like a concurrency framework. It gives you sockets
-that carry atomic messages across various transports
-like in-process, inter-process, TCP, and multicast. You can connect
-sockets N-to-N with patterns like fan-out, pub-sub, task distribution,
-and request-reply. It's fast enough to be the fabric for clustered
-products. Its asynchronous I/O model gives you scalable multi-core
-applications, built as asynchronous message-processing tasks. It has
-a score of language APIs and runs on most operating systems.
-Originally the zero in 0MQ was meant as "zero broker" and (as close to)
-"zero latency" (as possible). Since then, it has come to encompass
-different goals: zero administration, zero cost, and zero waste.
-More generally, "zero" refers to the culture of minimalism that permeates
-the project.
-More detail regarding ZeroMQ library is available from the `specification`_.
-.. _specification:
-Currently, ZeroMQ is one of the RPC backend drivers in oslo.messaging. ZeroMQ
-can be the only RPC driver across the OpenStack cluster.
-This document provides deployment information for this driver in oslo_messaging.
-Other than AMQP-based drivers, like RabbitMQ, default ZeroMQ doesn't have
-any central brokers in oslo.messaging, instead, each host (running OpenStack
-services) is both ZeroMQ client and server. As a result, each host needs to
-listen to a certain TCP port for incoming connections and directly connect
-to other hosts simultaneously.
-Another option is to use a router proxy. It is not a broker because it
-doesn't assume any message ownership or persistence or replication etc. It
-performs only a redirection of messages to endpoints taking routing info from
-message envelope.
-Topics are used to identify the destination for a ZeroMQ RPC call. There are
-two types of topics, bare topics and directed topics. Bare topics look like
-'compute', while directed topics look like 'compute.machine1'.
-Assuming the following systems as a goal.
- +--------+
- | Client |
- +----+---+
- |
- -----+---------+-----------------------+---------------------
- | |
- +--------+------------+ +-------+----------------+
- | Controller Node | | Compute Node |
- | Nova | | Neutron |
- | Keystone | | Nova |
- | Glance | | nova-compute |
- | Neutron | | Ceilometer |
- | Cinder | | |
- | Ceilometer | +------------------------+
- | zmq-proxy |
- | Redis |
- | Horizon |
- +---------------------+
-Basic Configuration
-Enabling (mandatory)
-To enable the driver the 'transport_url' option must be set to 'zmq://'
-in the section [DEFAULT] of the conf file, the 'rpc_zmq_host' option
-must be set to the hostname of the current node. ::
- transport_url = "zmq://"
- [oslo_messaging_zmq]
- rpc_zmq_host = {hostname}
-Default configuration of zmq driver is called 'Static Direct Connections' (To
-learn more about zmq driver configurations please proceed to the corresponding
-section 'Existing Configurations'). That means that all services connect
-directly to each other and all connections are static so we open them at the
-beginning of service's lifecycle and close them only when service quits. This
-configuration is the simplest one since it doesn't require any helper services
-(proxies) other than matchmaker to be running.
-Matchmaking (mandatory)
-The ZeroMQ driver implements a matching capability to discover hosts available
-for communication when sending to a bare topic. This allows broker-less
-The Matchmaker is pluggable and it provides two different Matchmaker classes.
-MatchmakerDummy: default matchmaker driver for all-in-one scenario (messages
-are sent to itself; used mainly for testing).
-MatchmakerRedis: loads the hash table from a remote Redis server, supports
-dynamic host/topic registrations, host expiration, and hooks for consuming
-applications to acknowledge or neg-acknowledge service availability.
-For ZeroMQ driver Redis is configured in transport_url also. For using Redis
-specify the URL as follows::
- transport_url = "zmq+redis://"
-In order to cleanup redis storage from expired records (e.g. target listener
-goes down) TTL may be applied for keys. Configure 'zmq_target_expire' option
-which is 300 (seconds) by default. The option is related not specifically to
-redis so it is also defined in [oslo_messaging_zmq] section. If option value
-is <= 0 then keys don't expire and live forever in the storage.
-The other option is 'zmq_target_update' (180 seconds by default) which
-specifies how often each RPC-Server should update the matchmaker. This option's
-optimal value generally is zmq_target_expire / 2 (or 1.5). It is recommended to
-calculate it based on 'zmq_target_expire' so services records wouldn't expire
-earlier than being updated from alive services.
-Generally matchmaker can be considered as an alternate approach to services
-Matchmaker Data Source (mandatory)
-Matchmaker data source is stored in files or Redis server discussed in the
-previous section. How to make up the database is the key issue for making ZeroMQ
-driver work.
-If deploying the MatchmakerRedis, a Redis server is required. Each (K, V) pair
-stored in Redis is that the key is a base topic and the corresponding values are
-hostname arrays to be sent to.
-HA for Redis database
-Single node Redis works fine for testing, but for production there is some
-availability guarantees wanted. Without Redis database zmq deployment should
-continue working anyway, because there is no need in Redis for services when
-connections established already. But if you would like to restart some services
-or run more workers or add more hardware nodes to the deployment you will need
-nodes discovery mechanism to work and it requires Redis.
-To provide database recovery in situations when redis node goes down for example,
-we use Sentinel solution and redis master-slave-slave configuration (if we have
-3 controllers and run Redis on each of them).
-To deploy redis with HA follow the `sentinel-install`_ instructions. From the
-messaging driver's side you will need to setup following configuration ::
- transport_url = "zmq+sentinel://host1:26379,host2:26379,host3:26379"
-Listening Address (optional)
-All services bind to an IP address or Ethernet adapter. By default, all services
-bind to '*', effectively binding to This may be changed with the option
-'rpc_zmq_bind_address' which accepts a wildcard, IP address, or Ethernet adapter.
-This configuration can be set in [oslo_messaging_zmq] section.
-For example::
- rpc_zmq_bind_address = *
-Currently zmq driver uses dynamic port binding mechanism, which means that
-each listener will allocate port of a random number (static, i.e. fixed, ports
-may only be used for sockets inside proxies now). Ports range is controlled
-by two options 'rpc_zmq_min_port' and 'rpc_zmq_max_port'. Change them to
-restrict current service's port binding range. 'rpc_zmq_bind_port_retries'
-controls number of retries before 'ports range exceeded' failure.
-For example::
- rpc_zmq_min_port = 49153
- rpc_zmq_max_port = 65536
- rpc_zmq_bind_port_retries = 100
-Existing Configurations
-Static Direct Connections
-The example of service config file::
- transport_url = "zmq+redis://host-1:6379"
- [oslo_messaging_zmq]
- use_pub_sub = false
- use_router_proxy = false
- use_dynamic_connections = false
- zmq_target_expire = 60
- zmq_target_update = 30
- rpc_zmq_min_port = 49153
- rpc_zmq_max_port = 65536
-In both static and dynamic direct connections configuration it is necessary to
-configure firewall to open binding port range on each node::
- iptables -A INPUT -p tcp --match multiport --dports 49152:65535 -j ACCEPT
-The sequrity recommendation here (it is general for any RPC backend) is to
-setup private network for message bus and another open network for public APIs.
-ZeroMQ driver doesn't support authentication and encryption on its level.
-As stated above this configuration is the simplest one since it requires only a
-Matchmaker service to be running. That is why driver's options configured by
-default in a way to use this type of topology.
-The biggest advantage of static direct connections (other than simplicity) is
-it's huge performance. On small deployments (20 - 50 nodes) it can outperform
-brokered solutions (or solutions with proxies) 3x - 5x times. It becomes possible
-because this configuration doesn't have a central node bottleneck so it's
-throughput is limited by only a TCP and network bandwidth.
-Unfortunately this approach can not be applied as is on a big scale (over 500 nodes).
-The main problem is the number of connections between services and particularly
-the number of connections on each controller node grows (in a worst case) as
-a square function of number of the whole running services. That's not
-However this approach can be successfully used and is recommended to be used
-when services on controllers doesn't talk to agent services on resource nodes
-using oslo.messaging RPC, but RPC is used only to communicate controller
-services between each other.
-Examples here may be Cinder+Ceph backend and Ironic how it utilises
-For all the other cases like Nova and Neutron on a big scale using proxy-based
-configurations or dynamic connections configuration is more appropriate.
-The exception here may be the case when using OpenStack services inside Docker
-containers with Kubernetes. Since Kubernetes already solves similar problems by
-using KubeProxy and virtual IP addresses for each container. So it manages all
-the traffic using iptables which is more than appropriate to solve the problem
-described above.
-Summing up it is recommended to use this type of zmq configuration for
-1. Small clouds (up to 100 nodes)
-2. Cinder+Ceph deployment
-3. Ironic deployment
-4. OpenStack + Kubernetes (OpenStack in containers) deployment
-Dynamic Direct Connections
-The example of service config file::
- transport_url = "zmq+redis://host-1:6379"
- [oslo_messaging_zmq]
- use_pub_sub = false
- use_router_proxy = false
- use_dynamic_connections = true
- zmq_failover_connections = 2
- zmq_linger = 60
- zmq_target_expire = 60
- zmq_target_update = 30
- rpc_zmq_min_port = 49153
- rpc_zmq_max_port = 65536
-The 'use_dynamic_connections = true' obviously states that connections are dynamic.
-'zmq_linger' become crucial with dynamic connections in order to avoid socket
-leaks. If socket being connected to a wrong (dead) host which somehow still
-present in the Matchmaker and message was sent, then the socket can not be closed
-until message stays in the queue (the default linger is infinite waiting). So
-need to specify linger explicitly.
-Services often run more than one worker on the same topic. Workers are equal, so
-any can handle the message. In order to connect to more than one available worker
-need to setup 'zmq_failover_connections' option to some value (2 by default which
-means 2 additional connections). Take care because it may also result in slow-down.
-All recommendations regarding port ranges described in previous section are also
-valid here.
-Most things are similar to what we had with static connections the only
-difference is that each message causes connection setup and disconnect afterwards
-immediately after message was sent.
-The advantage of this deployment is that average number of connections on
-controller node at any moment is not high even for quite large deployments.
-The disadvantage is overhead caused by need to connect/disconnect per message.
-So this configuration can with no doubt be considered as the slowest one. The
-good news is the RPC of OpenStack doesn't require "thousands message per second"
-bandwidth per each particular service (do not confuse with central broker/proxy
-bandwidth which is needed as high as possible for a big scale and can be a
-serious bottleneck).
-One more bad thing about this particular configuration is fanout. Here it is
-completely linear complexity operation and it suffers the most from
-connect/disconnect overhead per message. So for fanout it is fair to say that
-services can have significant slow-down with dynamic connections.
-The recommended way to solve this problem is to use combined solution with
-proxied PUB/SUB infrastructure for fanout and dynamic direct connections for
-direct message types (plain CAST and CALL messages). This combined approach
-will be described later in the text.
-Router Proxy
-The example of service config file::
- transport_url = "zmq+redis://host-1:6379"
- [oslo_messaging_zmq]
- use_pub_sub = false
- use_router_proxy = true
- use_dynamic_connections = false
-The example of proxy config file::
- transport_url = "zmq+redis://host-1:6379"
- [oslo_messaging_zmq]
- use_pub_sub = false
- [zmq_proxy_opt]
- host = host-1
-RPC may consume too many TCP sockets on controller node in directly connected
-configuration. To solve the issue ROUTER proxy may be used.
-In order to configure driver to use ROUTER proxy set up the 'use_router_proxy'
-option to true in [oslo_messaging_zmq] section (false is set by default).
-Pay attention to 'use_pub_sub = false' line, which has to match for all
-services and proxies configs, so it wouldn't work if proxy uses PUB/SUB and
-services don't.
-Not less than 3 proxies should be running on controllers or on stand alone
-nodes. The parameters for the script oslo-messaging-zmq-proxy should be::
- oslo-messaging-zmq-proxy
- --config-file /etc/oslo/zeromq.conf
- --log-file /var/log/oslo/zeromq-router-proxy.log
- --host node-123
- --frontend-port 50001
- --backend-port 50002
- --debug
-Config file for proxy consists of default section, 'oslo_messaging_zmq' section
-and additional 'zmq_proxy_opts' section.
-Command line arguments like host, frontend_port, backend_port and publisher_port
-respectively can also be set in 'zmq_proxy_opts' section of a configuration
-file (i.e., /etc/oslo/zeromq.conf). All arguments are optional.
-Port value of 0 means random port (see the next section for more details).
-Take into account that --debug flag makes proxy to make a log record per every
-dispatched message which influences proxy performance significantly. So it is
-not recommended flag to use in production. Without --debug there will be only
-Matchmaker updates or critical errors in proxy logs.
-In this configuration we use proxy as a very simple dispatcher (so it has the
-best performance with minimal overhead). The only thing proxy does is getting
-binary routing-key frame from the message and dispatch message on this key.
-In this kind of deployment client is in charge of doing fanout. Before sending
-fanout message client takes a list of available hosts for the topic and sends
-as many messages as the number of hosts it got.
-This configuration just uses DEALER/ROUTER pattern of ZeroMQ and doesn't use
-PUB/SUB as it was stated above.
-Disadvantage of this approach is again slower client fanout. But it is much
-better than with dynamic direct connections because we don't need to connect
-and disconnect per each message.
-ZeroMQ PUB/SUB Infrastructure
-The example of service config file::
- transport_url = "zmq+redis://host-1:6379"
- [oslo_messaging_zmq]
- use_pub_sub = true
- use_router_proxy = true
- use_dynamic_connections = false
-The example of proxy config file::
- transport_url = "zmq+redis://host-1:6379"
- [oslo_messaging_zmq]
- use_pub_sub = true
- [zmq_proxy_opt]
- host = host-1
-It seems obvious that fanout pattern of oslo.messaging maps on ZeroMQ PUB/SUB
-pattern, but it is only at first glance. It does really, but lets look a bit
-First caveat is that in oslo.messaging it is a client who makes fanout (and
-generally initiates conversation), server is passive. While in ZeroMQ publisher
-is a server and subscribers are clients. And here is the problem: RPC-servers
-are subscribers in terms of ZeroMQ PUB/SUB, they hold the SUB socket and wait
-for messages. And they don't know anything about RPC-clients, and clients
-generally come later than servers. So servers don't have a PUB to subscribe
-on start, so we need to introduce something in the middle, and here the proxy
-plays the role.
-Publisher proxy has ROUTER socket on the front-end and PUB socket on the back-end.
-So client connects to ROUTER and sends a single message to a publisher proxy.
-Proxy redirects this message to PUB socket which performs actual publishing.
-Command to run central publisher proxy::
- oslo-messaging-zmq-proxy
- --config-file /etc/oslo/zeromq.conf
- --log-file /var/log/oslo/zeromq-router-proxy.log
- --host node-123
- --frontend-port 50001
- --publisher-port 50003
- --debug
-When we run a publisher proxy we need to specify a --publisher-port option.
-Random port will be picked up otherwise and clients will get it from the
-The advantage of this approach is really fast fanout, while it takes time on
-proxy to publish, but ZeroMQ PUB/SUB is one of the fastest fanout pattern
-implementations. It also makes clients faster, because they need to send only a
-single message to a proxy.
-In order to balance load and HA it is recommended to have at least 3 proxies basically,
-but the number of running proxies is not limited. They also don't form a cluster,
-so there are no limitations on number caused by consistency algorithm requirements.
-The disadvantage is that number of connections on proxy increased twice compared
-to previous deployment, because we still need to use router for direct messages.
-The documented limitation of ZeroMQ PUB/SUB is 10k subscribers.
-In order to limit the number of subscribers and connections the local proxies
-may be used. In order to run local publisher the following command may be used::
- oslo-messaging-zmq-proxy
- --local-publisher
- --config-file /etc/oslo/zeromq.conf
- --log-file /var/log/oslo/zeromq-router-proxy.log
- --host localhost
- --publisher-port 60001
- --debug
-Pay attention to --local-publisher flag which specifies the type of a proxy.
-Local publishers may be running on every single node of a deployment. To make
-services use of local publishers the 'subscribe_on' option has to be specified
-in service's config file::
- transport_url = "zmq+redis://host-1:6379"
- [oslo_messaging_zmq]
- use_pub_sub = true
- use_router_proxy = true
- use_dynamic_connections = false
- subscribe_on = localhost:60001
-If we forgot to specify the 'subscribe_on' services will take info from Matchmaker
-and still connect to a central proxy, so the trick wouldn't work. Local proxy
-gets all the needed info from the matchmaker in order to find central proxies
-and subscribes on them. Frankly speaking you can pub a central proxy in the
-'subscribe_on' value, even a list of hosts may be passed the same way as we do
-for the transport_url::
- subscribe_on = host-1:50003,host-2:50003,host-3:50003
-This is completely valid, just not necessary because we have information about
-central proxies in Matchmaker. One more thing to highlight about 'subscribe_on'
-is that it has higher priority than Matchmaker if being explicitly mentioned.
-Concluding all the above, fanout over PUB/SUB proxies is the best choice
-because of static connections infrastructure, fail over when one or some publishers
-die, and ZeroMQ PUB/SUB high performance.
-What If Mix Different Configurations?
-Three boolean variables 'use_pub_sub', 'use_router_proxy' and 'use_dynamic_connections'
-give us exactly 8 possible combinations. But from practical perspective not all
-of them are usable. So lets discuss only those which make sense.
-The main recommended combination is Dynamic Direct Connections plus PUB/SUB
-infrastructure. So we deploy PUB/SUB proxies as described in corresponding
-paragraph (either with local+central proxies or with only a central proxies).
-And the services configuration file will look like the following::
- transport_url = "zmq+redis://host-1:6379"
- [oslo_messaging_zmq]
- use_pub_sub = true
- use_router_proxy = false
- use_dynamic_connections = true
-So we just tell the driver not to pass direct messages CALL and CAST over router,
-but send them directly to RPC servers. All the details of configuring services
-and port ranges has to be taken from 'Dynamic Direct Connections' paragraph.
-So it's combined configuration. Currently it is the best choice from number of
-connections perspective.
-Frankly speaking, deployment from the 'ZeroMQ PUB/SUB Infrastructure' section is
-also a combination of 'Router Proxy' with PUB/SUB, we've just used the same
-proxies for both.
-Here we've discussed combination inside the same service. But configurations can
-also be combined on a higher level, a level of services. So you could have for
-example a deployment where Cinder uses static direct connections and Nova/Neutron
-use combined PUB/SUB + dynamic direct connections. But such approach needs additional
-caution and may be confusing for cloud operators. Still it provides maximum
-optimization of performance and number of connections on proxies and controller
-DevStack Support
-ZeroMQ driver can be tested on a single node deployment with DevStack. Take
-into account that on a single node it is not that obvious any performance
-increase compared to other backends. To see significant speed up you need at least
-20 nodes.
-In local.conf [localrc] section need to enable zmq plugin which lives in
-`devstack-plugin-zmq`_ repository.
-For example::
- enable_plugin zmq
-Example of local.conf::
- [[local|localrc]]
- SERVICE_TOKEN=password
- enable_plugin zmq
- LIBS_FROM_GIT=oslo.messaging
-.. _devstack-plugin-zmq:
-.. _sentinel-install:
diff --git a/lower-constraints.txt b/lower-constraints.txt
index 392c741..ef88244 100644
--- a/lower-constraints.txt
+++ b/lower-constraints.txt
@@ -69,8 +69,6 @@ python-qpid-proton==0.17.0
diff --git a/oslo_messaging/_cmd/ b/oslo_messaging/_cmd/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_cmd/
+++ /dev/null
diff --git a/oslo_messaging/_cmd/ b/oslo_messaging/_cmd/
deleted file mode 100644
index c0b07c3..0000000
--- a/oslo_messaging/_cmd/
+++ /dev/null
@@ -1,48 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-from oslo_config import cfg
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
-from oslo_messaging._drivers.zmq_driver import zmq_options
-from oslo_messaging._i18n import _LI
-from oslo_messaging.transport import TransportURL
-LOG = logging.getLogger(__name__)
-def main():
- conf = cfg.CONF
- opt_group = cfg.OptGroup(name='zmq_proxy_opts',
- title='ZeroMQ proxy options')
- conf.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
- zmq_options.register_opts(conf, TransportURL.parse(conf))
- zmq_proxy.parse_command_line_args(conf)
- reactor = zmq_proxy.ZmqProxy(conf)
- try:
- while True:
- except (KeyboardInterrupt, SystemExit):
-"Exit proxy by interrupt signal."))
- finally:
- reactor.close()
-if __name__ == "__main__":
- main()
diff --git a/oslo_messaging/_drivers/ b/oslo_messaging/_drivers/
deleted file mode 100644
index ece9a4b..0000000
--- a/oslo_messaging/_drivers/
+++ /dev/null
@@ -1,219 +0,0 @@
-# Copyright 2011 Cloudscaling Group, Inc
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import os
-import threading
-from debtcollector import removals
-from stevedore import driver
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.client import zmq_client
-from oslo_messaging._drivers.zmq_driver.server import zmq_server
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_options
-from oslo_messaging._i18n import _LE
-RPCException = rpc_common.RPCException
-class LazyDriverItem(object):
- def __init__(self, item_cls, *args, **kwargs):
- self._lock = threading.Lock()
- self.item = None
- self.item_class = item_cls
- self.args = args
- self.kwargs = kwargs
- self.process_id = os.getpid()
- def get(self):
- # NOTE(ozamiatin): Lazy initialization.
- # All init stuff moved closer to usage point - lazy init.
- # Better design approach is to initialize in the driver's
- # __init__, but 'fork' extensively used by services
- # breaks all things.
- if self.item is not None and os.getpid() == self.process_id:
- return self.item
- with self._lock:
- if self.item is None or os.getpid() != self.process_id:
- self.process_id = os.getpid()
- self.item = self.item_class(*self.args, **self.kwargs)
- return self.item
- def cleanup(self):
- if self.item:
- self.item.cleanup()
-@removals.removed_class('ZmqDriver', version='Rocky', removal_version='Stein',
- message='The ZeroMQ driver is no longer supported')
-class ZmqDriver(base.BaseDriver):
- """ZeroMQ Driver implementation.
- Provides implementation of RPC and Notifier APIs by means
- of ZeroMQ library.
- See :doc:`zmq_driver` for details.
- """
- def __init__(self, conf, url, default_exchange=None,
- allowed_remote_exmods=None):
- """Construct ZeroMQ driver.
- Initialize driver options.
- Construct matchmaker - pluggable interface to targets management
- Name Service
- Construct client and server controllers
- :param conf: oslo messaging configuration object
- :type conf: oslo_config.CONF
- :param url: transport URL
- :type url: TransportUrl
- :param default_exchange: Not used in zmq implementation
- :type default_exchange: None
- :param allowed_remote_exmods: remote exception passing options
- :type allowed_remote_exmods: list
- """
- zmq = zmq_async.import_zmq()
- if zmq is None:
- raise ImportError(_LE("ZeroMQ is not available!"))
- conf = zmq_options.register_opts(conf, url)
- self.conf = conf
- self.allowed_remote_exmods = allowed_remote_exmods
- self.matchmaker = driver.DriverManager(
- 'oslo.messaging.zmq.matchmaker',
- self.get_matchmaker_backend(self.conf, url),
- ).driver(self.conf, url=url)
- client_cls = zmq_client.ZmqClientProxy
- if conf.oslo_messaging_zmq.use_pub_sub and not \
- conf.oslo_messaging_zmq.use_router_proxy:
- client_cls = zmq_client.ZmqClientMixDirectPubSub
- elif not conf.oslo_messaging_zmq.use_pub_sub and not \
- conf.oslo_messaging_zmq.use_router_proxy:
- client_cls = zmq_client.ZmqClientDirect
- self.client = LazyDriverItem(
- client_cls, self.conf, self.matchmaker,
- self.allowed_remote_exmods)
- self.notifier = LazyDriverItem(
- client_cls, self.conf, self.matchmaker,
- self.allowed_remote_exmods)
- super(ZmqDriver, self).__init__(conf, url, default_exchange,
- allowed_remote_exmods)
- @staticmethod
- def get_matchmaker_backend(conf, url):
- zmq_transport, _, matchmaker_backend = url.transport.partition('+')
- assert zmq_transport == 'zmq', "Needs to be zmq for this transport!"
- if not matchmaker_backend:
- return conf.oslo_messaging_zmq.rpc_zmq_matchmaker
- if matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS:
- raise rpc_common.RPCException(
- _LE("Incorrect matchmaker backend name %(backend_name)s! "
- "Available names are: %(available_names)s") %
- {"backend_name": matchmaker_backend,
- "available_names": zmq_options.MATCHMAKER_BACKENDS})
- return matchmaker_backend
- def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
- retry=None):
- """Send RPC message to server
- :param target: Message destination target
- :type target: oslo_messaging.Target
- :param ctxt: Message context
- :type ctxt: dict
- :param message: Message payload to pass
- :type message: dict
- :param wait_for_reply: Waiting for reply flag
- :type wait_for_reply: bool
- :param timeout: Reply waiting timeout in seconds
- :type timeout: int
- :param retry: an optional default connection retries configuration
- None or -1 means to retry forever
- 0 means no retry
- N means N retries
- :type retry: int
- """
- client = self.client.get()
- if wait_for_reply:
- return client.send_call(target, ctxt, message, timeout, retry)
- elif target.fanout:
- client.send_fanout(target, ctxt, message, retry)
- else:
- client.send_cast(target, ctxt, message, retry)
- def send_notification(self, target, ctxt, message, version, retry=None):
- """Send notification to server
- :param target: Message destination target
- :type target: oslo_messaging.Target
- :param ctxt: Message context
- :type ctxt: dict
- :param message: Message payload to pass
- :type message: dict
- :param version: Messaging API version
- :type version: str
- :param retry: an optional default connection retries configuration
- None or -1 means to retry forever
- 0 means no retry
- N means N retries
- :type retry: int
- """
- client = self.notifier.get()
- client.send_notify(target, ctxt, message, version, retry)
- def listen(self, target, batch_size, batch_timeout):
- """Listen to a specified target on a server side
- :param target: Message destination target
- :type target: oslo_messaging.Target
- """
- listener = zmq_server.ZmqServer(self, self.conf, self.matchmaker,
- target)
- return base.PollStyleListenerAdapter(listener, batch_size,
- batch_timeout)
- def listen_for_notifications(self, targets_and_priorities, pool,
- batch_size, batch_timeout):
- """Listen to a specified list of targets on a server side
- :param targets_and_priorities: List of pairs (target, priority)
- :type targets_and_priorities: list
- :param pool: Not used for zmq implementation
- :type pool: object
- """
- listener = zmq_server.ZmqNotificationServer(
- self, self.conf, self.matchmaker, targets_and_priorities)
- return base.PollStyleListenerAdapter(listener, batch_size,
- batch_timeout)
- def cleanup(self):
- """Cleanup all driver's connections finally
- """
- self.client.cleanup()
- self.notifier.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/ b/oslo_messaging/_drivers/zmq_driver/client/publishers/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/
deleted file mode 100644
index c5456e7..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/
+++ /dev/null
@@ -1,70 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from concurrent import futures
-import logging
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_publisher_base
-from oslo_messaging._drivers.zmq_driver.client import zmq_response
-from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LE
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class DealerPublisherBase(zmq_publisher_base.PublisherBase):
- """Abstract DEALER-publisher."""
- def __init__(self, conf, matchmaker, sender, receiver):
- sockets_manager = zmq_sockets_manager.SocketsManager(
- conf, matchmaker, zmq.DEALER)
- super(DealerPublisherBase, self).__init__(
- sockets_manager, sender, receiver)
- def _check_reply(self, reply, request):
- assert isinstance(reply, zmq_response.Reply), "Reply expected!"
- def _finally_unregister(self, socket, request):
- self.receiver.untrack_request(request)
- def receive_reply(self, socket, request):
- self.receiver.register_socket(socket)
- _, reply_future = self.receiver.track_request(request)
- try:
- reply = reply_future.result(timeout=request.timeout)
- self._check_reply(reply, request)
- except AssertionError:
- LOG.error(_LE("Message format error in reply for %s"),
- request.message_id)
- return None
- except futures.TimeoutError:
- self._raise_timeout(request)
- finally:
- self._finally_unregister(socket, request)
- if reply.failure:
- raise rpc_common.deserialize_remote_exception(
- reply.failure, request.allowed_remote_exmods)
- else:
- return reply.reply_body
- def cleanup(self):
- super(DealerPublisherBase, self).cleanup()
- self.sockets_manager.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/
deleted file mode 100644
index d1a61e1..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/
+++ /dev/null
@@ -1,152 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import tenacity
-from \
- import zmq_dealer_publisher_base
-from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
-from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
-from oslo_messaging._drivers.zmq_driver.client import zmq_senders
-from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
- """DEALER-publisher using direct dynamic connections.
- Publishing directly to remote services assumes the following:
- - All direct connections are dynamic - so they live per message,
- thus each message send executes the following:
- * Open a new socket
- * Connect to some host got from the RoutingTable
- * Send message(s)
- * Close connection, destroy socket
- - RoutingTable/RoutingTableUpdater implements local cache of
- matchmaker (e.g. Redis) for target resolution to the list of
- available hosts. Cache updates in a background thread.
- - Caching of connections is not appropriate for directly connected
- OS services, because finally it results in a full-mesh of
- connections between services.
- - Yes we lose on performance opening and closing connections
- for each message, but that is done intentionally to implement
- the dynamic connections concept. The key thought here is to
- have minimum number of connected services at the moment.
- - Using the local RoutingTable cache is done to optimise access
- to the matchmaker so we don't call the matchmaker per each message
- """
- def __init__(self, conf, matchmaker):
- sender = zmq_senders.RequestSenderDirect(conf, use_async=True)
- receiver = zmq_receivers.ReceiverDirect(conf)
- super(DealerPublisherDirect, self).__init__(conf, matchmaker,
- sender, receiver)
- self.routing_table = zmq_routing_table.RoutingTableAdaptor(
- conf, matchmaker, zmq.ROUTER)
- def _get_round_robin_host_connection(self, target, socket):
- host = self.routing_table.get_round_robin_host(target)
- socket.connect_to_host(host)
- failover_hosts = self.routing_table.get_all_round_robin_hosts(target)
- upper_bound = self.conf.oslo_messaging_zmq.zmq_failover_connections
- for host in failover_hosts[:upper_bound]:
- socket.connect_to_host(host)
- def _get_fanout_connection(self, target, socket):
- for host in self.routing_table.get_fanout_hosts(target):
- socket.connect_to_host(host)
- def acquire_connection(self, request):
- if request.msg_type in zmq_names.DIRECT_TYPES:
- socket = self.sockets_manager.get_socket()
- self._get_round_robin_host_connection(, socket)
- return socket
- elif request.msg_type in zmq_names.MULTISEND_TYPES:
- socket = self.sockets_manager.get_socket(immediate=False)
- self._get_fanout_connection(, socket)
- return socket
- def _finally_unregister(self, socket, request):
- super(DealerPublisherDirect, self)._finally_unregister(socket, request)
- self.receiver.unregister_socket(socket)
- def send_request(self, socket, request):
- if hasattr(request, 'timeout'):
- _stop = tenacity.stop_after_delay(request.timeout)
- elif request.retry is not None and request.retry > 0:
- # no rpc_response_timeout option if notification
- _stop = tenacity.stop_after_attempt(request.retry)
- else:
- # well, now what?
- _stop = tenacity.stop_after_delay(60)
- @tenacity.retry(retry=tenacity.retry_if_exception_type(zmq.Again),
- stop=_stop)
- def send_retrying():
- if request.msg_type in zmq_names.MULTISEND_TYPES:
- for _ in range(socket.connections_count()):
- self.sender.send(socket, request)
- else:
- self.sender.send(socket, request)
- return send_retrying()
- def cleanup(self):
- self.routing_table.cleanup()
- super(DealerPublisherDirect, self).cleanup()
-class DealerPublisherDirectStatic(DealerPublisherDirect):
- """DEALER-publisher using direct static connections.
- For some reason direct static connections may be also useful.
- Assume a case when some agents are not connected with control services
- over RPC (Ironic or Cinder+Ceph), and RPC is used only between controllers.
- In this case number of RPC connections doesn't matter (very small) so we
- can use static connections without fear and have all performance benefits
- from it.
- """
- def __init__(self, conf, matchmaker):
- super(DealerPublisherDirectStatic, self).__init__(conf, matchmaker)
- self.fanout_sockets = zmq_sockets_manager.SocketsManager(
- conf, matchmaker, zmq.DEALER)
- def acquire_connection(self, request):
- target_key = zmq_address.target_to_key(
-, zmq_names.socket_type_str(zmq.ROUTER))
- if request.msg_type in zmq_names.MULTISEND_TYPES:
- hosts = self.routing_table.get_fanout_hosts(
- return self.fanout_sockets.get_cached_socket(target_key, hosts,
- immediate=False)
- else:
- hosts = self.routing_table.get_all_round_robin_hosts(
- return self.sockets_manager.get_cached_socket(target_key, hosts)
- def _finally_unregister(self, socket, request):
- self.receiver.untrack_request(request)
- def cleanup(self):
- self.fanout_sockets.cleanup()
- super(DealerPublisherDirectStatic, self).cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/
deleted file mode 100644
index d949103..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/
+++ /dev/null
@@ -1,136 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import random
-import uuid
-import six
-from \
- import zmq_dealer_publisher_base
-from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
-from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
-from oslo_messaging._drivers.zmq_driver.client import zmq_senders
-from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_updater
-zmq = zmq_async.import_zmq()
-class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
- """DEALER-publisher via proxy."""
- def __init__(self, conf, matchmaker):
- sender = zmq_senders.RequestSenderProxy(conf)
- receiver = zmq_receivers.ReceiverProxy(conf)
- super(DealerPublisherProxy, self).__init__(conf, matchmaker,
- sender, receiver)
- self.socket = self.sockets_manager.get_socket_to_publishers(
- self._generate_identity())
- self.routing_table = zmq_routing_table.RoutingTableAdaptor(
- conf, matchmaker, zmq.DEALER)
- self.connection_updater = PublisherConnectionUpdater(
- self.conf, self.matchmaker, self.socket)
- def _generate_identity(self):
- return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + "/" +
- str(uuid.uuid4()))
- def _check_reply(self, reply, request):
- super(DealerPublisherProxy, self)._check_reply(reply, request)
- assert reply.reply_id == request.routing_key, \
- "Reply from recipient expected!"
- def _get_routing_keys(self, request):
- if request.msg_type in zmq_names.DIRECT_TYPES:
- return [self.routing_table.get_round_robin_host(]
- else:
- return \
- [zmq_address.target_to_subscribe_filter(] \
- if self.conf.oslo_messaging_zmq.use_pub_sub else \
- self.routing_table.get_fanout_hosts(
- def acquire_connection(self, request):
- return self.socket
- def send_request(self, socket, request):
- for routing_key in self._get_routing_keys(request):
- request.routing_key = routing_key
- self.sender.send(socket, request)
- def cleanup(self):
- self.connection_updater.stop()
- self.routing_table.cleanup()
- super(DealerPublisherProxy, self).cleanup()
-class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
- def _update_connection(self):
- publishers = self.matchmaker.get_publishers()
- for pub_address, fe_router_address in publishers:
- self.socket.connect_to_host(fe_router_address)
-class DealerPublisherProxyDynamic(
- zmq_dealer_publisher_base.DealerPublisherBase):
- def __init__(self, conf, matchmaker):
- sender = zmq_senders.RequestSenderProxy(conf)
- receiver = zmq_receivers.ReceiverDirect(conf)
- super(DealerPublisherProxyDynamic, self).__init__(conf, matchmaker,
- sender, receiver)
- self.publishers = set()
- self.updater = DynamicPublishersUpdater(conf, matchmaker,
- self.publishers)
- self.updater.update_publishers()
- def acquire_connection(self, request):
- if not self.publishers:
- raise zmq_matchmaker_base.MatchmakerUnavailable()
- socket = self.sockets_manager.get_socket()
- publishers = list(self.publishers)
- random.shuffle(publishers)
- for publisher in publishers:
- socket.connect_to_host(publisher)
- return socket
- def send_request(self, socket, request):
- request.routing_key = \
- zmq_address.target_to_subscribe_filter(
- self.sender.send(socket, request)
- def cleanup(self):
- self.updater.cleanup()
- super(DealerPublisherProxyDynamic, self).cleanup()
-class DynamicPublishersUpdater(zmq_updater.UpdaterBase):
- def __init__(self, conf, matchmaker, publishers):
- super(DynamicPublishersUpdater, self).__init__(
- conf, matchmaker, self.update_publishers,
- sleep_for=conf.oslo_messaging_zmq.zmq_target_update
- )
- self.publishers = publishers
- def update_publishers(self):
- publishers = self.matchmaker.get_publishers()
- for pub_address, fe_router_address in publishers:
- self.publishers.add(fe_router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/ b/oslo_messaging/_drivers/zmq_driver/client/publishers/
deleted file mode 100644
index 90d2967..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/
+++ /dev/null
@@ -1,93 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import six
-import oslo_messaging
-from oslo_messaging._drivers.zmq_driver import zmq_async
-zmq = zmq_async.import_zmq()
-class PublisherBase(object):
- """Abstract publisher class
- Each publisher from zmq-driver client should implement
- this interface to serve as a messages publisher.
- Publisher can send request objects from zmq_request.
- """
- def __init__(self, sockets_manager, sender, receiver):
- """Construct publisher.
- Accept sockets manager, sender and receiver objects.
- :param sockets_manager: sockets manager object
- :type sockets_manager: zmq_sockets_manager.SocketsManager
- :param sender: request sender object
- :type sender: zmq_senders.RequestSenderBase
- :param receiver: response receiver object
- :type receiver: zmq_receivers.ReceiverBase
- """
- self.sockets_manager = sockets_manager
- self.conf = sockets_manager.conf
- self.matchmaker = sockets_manager.matchmaker
- self.sender = sender
- self.receiver = receiver
- @abc.abstractmethod
- def acquire_connection(self, request):
- """Get socket to publish request on it.
- :param request: request object
- :type senders: zmq_request.Request
- """
- @abc.abstractmethod
- def send_request(self, socket, request):
- """Publish request on a socket.
- :param socket: socket object to publish request on
- :type socket: zmq_socket.ZmqSocket
- :param request: request object
- :type senders: zmq_request.Request
- """
- @abc.abstractmethod
- def receive_reply(self, socket, request):
- """Wait for a reply via the socket used for sending the request.
- :param socket: socket object to receive reply from
- :type socket: zmq_socket.ZmqSocket
- :param request: request object
- :type senders: zmq_request.Request
- """
- @staticmethod
- def _raise_timeout(request):
- raise oslo_messaging.MessagingTimeout(
- "Timeout %(tout)s seconds was reached for message %(msg_id)s" %
- {"tout": request.timeout, "msg_id": request.message_id})
- def cleanup(self):
- """Cleanup publisher: stop receiving responses, close allocated
- connections etc.
- """
- self.receiver.stop()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index 02aab77..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,118 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from concurrent import futures
-import logging
-from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager
-from oslo_messaging._drivers.zmq_driver.client import zmq_response
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._i18n import _LE, _LW
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class AckManager(zmq_publisher_manager.PublisherManagerBase):
- def __init__(self, publisher):
- super(AckManager, self).__init__(publisher, with_pool=True)
- @staticmethod
- def _check_ack(ack, request):
- if ack is not None:
- assert isinstance(ack, zmq_response.Ack), "Ack expected!"
- assert ack.reply_id == request.routing_key, \
- "Ack from recipient expected!"
- def _wait_for_ack(self, request, ack_future=None):
- if ack_future is None:
- ack_future = self._schedule_request_for_ack(request)
- retries = \
- request.retry or self.conf.oslo_messaging_zmq.rpc_retry_attempts
- if retries is None:
- retries = -1
- timeout = self.conf.oslo_messaging_zmq.rpc_ack_timeout_base
- done = ack_future is None
- while not done:
- try:
- ack = ack_future.result(timeout=timeout)
- done = True
- self._check_ack(ack, request)
- except AssertionError:
- LOG.error(_LE("Message format error in ack for %s"),
- request.message_id)
- except futures.TimeoutError:
- LOG.warning(_LW("No ack received within %(tout)s seconds "
- "for %(msg_id)s"),
- {"tout": timeout,
- "msg_id": request.message_id})
- if retries != 0:
- if retries > 0:
- retries -= 1
- self.sender.send(ack_future.socket, request)
- timeout *= \
- self.conf.oslo_messaging_zmq.rpc_ack_timeout_multiplier
- else:
- LOG.warning(_LW("Exhausted number of retries for %s"),
- request.message_id)
- done = True
- if request.msg_type != zmq_names.CALL_TYPE:
- self.receiver.untrack_request(request)
- @zmq_publisher_manager.target_not_found_warn
- def _send_request(self, request):
- socket = self.publisher.acquire_connection(request)
- self.publisher.send_request(socket, request)
- return socket
- def _schedule_request_for_ack(self, request):
- socket = self._send_request(request)
- if socket is None:
- return None
- self.receiver.register_socket(socket)
- ack_future, _ = self.receiver.track_request(request)
- ack_future.socket = socket
- return ack_future
- def send_call(self, request):
- ack_future = self._schedule_request_for_ack(request)
- if ack_future is None:
- self.publisher._raise_timeout(request)
- self.pool.submit(self._wait_for_ack, request, ack_future)
- try:
- return self.publisher.receive_reply(ack_future.socket, request)
- finally:
- if not ack_future.done():
- ack_future.set_result(None)
- def send_cast(self, request):
- self.pool.submit(self._wait_for_ack, request)
- send_fanout = _send_request
- send_notify = _send_request
-class AckManagerAsyncMultisend(AckManager):
- def _send_request_async(self, request):
- self.pool.submit(self._send_request, request)
- send_fanout = _send_request_async
- send_notify = _send_request_async
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index a7efd06..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from oslo_messaging._drivers import common
-from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-zmq = zmq_async.import_zmq()
-class WrongClientException(common.RPCException):
- """Raised if client type doesn't match configuration"""
-class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
- """Client for using with direct connections and fanout over proxy:
- use_pub_sub = true
- use_router_proxy = false
- """
- def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if conf.oslo_messaging_zmq.use_router_proxy or not \
- conf.oslo_messaging_zmq.use_pub_sub:
- raise WrongClientException()
- publisher = self._create_publisher_direct_dynamic(conf, matchmaker) \
- if conf.oslo_messaging_zmq.use_dynamic_connections else \
- self._create_publisher_direct(conf, matchmaker)
- publisher_proxy = self._create_publisher_proxy_dynamic(conf,
- matchmaker) \
- if conf.oslo_messaging_zmq.use_dynamic_connections else \
- self._create_publisher_proxy(conf, matchmaker)
- super(ZmqClientMixDirectPubSub, self).__init__(
- conf, matchmaker, allowed_remote_exmods,
- publishers={
- zmq_names.CAST_FANOUT_TYPE: publisher_proxy,
- zmq_names.NOTIFY_TYPE: publisher_proxy,
- "default": publisher
- }
- )
-class ZmqClientDirect(zmq_client_base.ZmqClientBase):
- """This kind of client (publishers combination) is to be used for
- direct connections only:
- use_pub_sub = false
- use_router_proxy = false
- """
- def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if conf.oslo_messaging_zmq.use_pub_sub or \
- conf.oslo_messaging_zmq.use_router_proxy:
- raise WrongClientException()
- publisher = self._create_publisher_direct_dynamic(conf, matchmaker) \
- if conf.oslo_messaging_zmq.use_dynamic_connections else \
- self._create_publisher_direct(conf, matchmaker)
- super(ZmqClientDirect, self).__init__(
- conf, matchmaker, allowed_remote_exmods,
- publishers={
- "default": publisher
- }
- )
-class ZmqClientProxy(zmq_client_base.ZmqClientBase):
- """Client for using with proxy:
- use_pub_sub = true
- use_router_proxy = true
- or
- use_pub_sub = false
- use_router_proxy = true
- """
- def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
- if not conf.oslo_messaging_zmq.use_router_proxy:
- raise WrongClientException()
- super(ZmqClientProxy, self).__init__(
- conf, matchmaker, allowed_remote_exmods,
- publishers={
- "default": self._create_publisher_proxy(conf, matchmaker)
- }
- )
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index 261ded9..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,117 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from \
- import zmq_dealer_publisher_direct
-from \
- import zmq_dealer_publisher_proxy
-from oslo_messaging._drivers.zmq_driver.client import zmq_ack_manager
-from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager
-from oslo_messaging._drivers.zmq_driver.client import zmq_request
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-zmq = zmq_async.import_zmq()
-class ZmqClientBase(object):
- def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None,
- publishers=None):
- self.conf = conf
- self.matchmaker = matchmaker
- self.allowed_remote_exmods = allowed_remote_exmods or []
- self.publishers = publishers
- self.call_publisher = publishers.get(zmq_names.CALL_TYPE,
- publishers["default"])
- self.cast_publisher = publishers.get(zmq_names.CAST_TYPE,
- publishers["default"])
- self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE,
- publishers["default"])
- self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE,
- publishers["default"])
- def send_call(self, target, context, message, timeout=None, retry=None):
- request = zmq_request.CallRequest(
- target, context=context, message=message, retry=retry,
- timeout=timeout, allowed_remote_exmods=self.allowed_remote_exmods
- )
- return self.call_publisher.send_call(request)
- def send_cast(self, target, context, message, retry=None):
- request = zmq_request.CastRequest(
- target, context=context, message=message, retry=retry
- )
- self.cast_publisher.send_cast(request)
- def send_fanout(self, target, context, message, retry=None):
- request = zmq_request.FanoutRequest(
- target, context=context, message=message, retry=retry
- )
- self.fanout_publisher.send_fanout(request)
- def send_notify(self, target, context, message, version, retry=None):
- request = zmq_request.NotificationRequest(
- target, context=context, message=message, retry=retry,
- version=version
- )
- self.notify_publisher.send_notify(request)
- @staticmethod
- def _create_publisher_direct(conf, matchmaker):
- publisher_cls = zmq_dealer_publisher_direct.DealerPublisherDirectStatic
- publisher_direct = publisher_cls(conf, matchmaker)
- publisher_manager_cls = zmq_publisher_manager.PublisherManagerStatic
- return publisher_manager_cls(publisher_direct)
- @staticmethod
- def _create_publisher_direct_dynamic(conf, matchmaker):
- publisher_cls = zmq_dealer_publisher_direct.DealerPublisherDirect
- publisher_direct = publisher_cls(conf, matchmaker)
- publisher_manager_cls = zmq_publisher_manager.PublisherManagerDynamic \
- if conf.oslo_messaging_zmq.use_pub_sub else \
- zmq_publisher_manager.PublisherManagerDynamicAsyncMultisend
- return publisher_manager_cls(publisher_direct)
- @staticmethod
- def _create_publisher_proxy(conf, matchmaker):
- publisher_proxy = \
- zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
- if conf.oslo_messaging_zmq.rpc_use_acks:
- ack_manager_cls = zmq_ack_manager.AckManager \
- if conf.oslo_messaging_zmq.use_pub_sub else \
- zmq_ack_manager.AckManagerAsyncMultisend
- return ack_manager_cls(publisher_proxy)
- else:
- publisher_manager_cls = \
- zmq_publisher_manager.PublisherManagerStatic \
- if conf.oslo_messaging_zmq.use_pub_sub else \
- zmq_publisher_manager.PublisherManagerStaticAsyncMultisend
- return publisher_manager_cls(publisher_proxy)
- @staticmethod
- def _create_publisher_proxy_dynamic(conf, matchmaker):
- publisher_proxy = \
- zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic(conf,
- matchmaker)
- return zmq_publisher_manager.PublisherManagerDynamic(publisher_proxy)
- def cleanup(self):
- cleaned = set()
- for publisher in self.publishers.values():
- if publisher not in cleaned:
- publisher.cleanup()
- cleaned.add(publisher)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index b9afc4e..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,185 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import contextlib
-import logging
-import six
-import tenacity
-from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LW
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-def _drop_message_warn(request):
- LOG.warning(_LW("Matchmaker contains no records for specified "
- "target %(target)s. Dropping message %(msg_id)s."),
- {"target":,
- "msg_id": request.message_id})
-def target_not_found_warn(func):
- def _target_not_found_warn(self, request, *args, **kwargs):
- try:
- return func(self, request, *args, **kwargs)
- except (zmq_matchmaker_base.MatchmakerUnavailable,
- tenacity.RetryError):
- _drop_message_warn(request)
- return _target_not_found_warn
-def target_not_found_timeout(func):
- def _target_not_found_timeout(self, request, *args, **kwargs):
- try:
- return func(self, request, *args, **kwargs)
- except (zmq_matchmaker_base.MatchmakerUnavailable,
- tenacity.RetryError):
- _drop_message_warn(request)
- self.publisher._raise_timeout(request)
- return _target_not_found_timeout
-class PublisherManagerBase(object):
- """Abstract publisher manager class
- Publisher knows how to establish connection, how to send message,
- and how to receive reply. PublisherManager coordinates all these steps
- regarding retrying logic in AckManager implementations. May also have an
- additional thread pool for scheduling background tasks.
- """
- def __init__(self, publisher, with_pool=False):
- self.publisher = publisher
- self.conf = publisher.conf
- self.sender = publisher.sender
- self.receiver = publisher.receiver
- if with_pool:
- self.pool = zmq_async.get_pool(
- size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
- )
- else:
- self.pool = None
- @abc.abstractmethod
- def send_call(self, request):
- """Send call request
- :param request: request object
- :type request: zmq_request.CallRequest
- """
- @abc.abstractmethod
- def send_cast(self, request):
- """Send cast request
- :param request: request object
- :type request: zmq_request.CastRequest
- """
- @abc.abstractmethod
- def send_fanout(self, request):
- """Send fanout request
- :param request: request object
- :type request: zmq_request.FanoutRequest
- """
- @abc.abstractmethod
- def send_notify(self, request):
- """Send notification request
- :param request: request object
- :type request: zmq_request.NotificationRequest
- """
- def cleanup(self):
- if self.pool:
- self.pool.shutdown(wait=True)
- self.publisher.cleanup()
-class PublisherManagerDynamic(PublisherManagerBase):
- @target_not_found_timeout
- def send_call(self, request):
- with contextlib.closing(self.publisher.acquire_connection(request)) \
- as socket:
- self.publisher.send_request(socket, request)
- reply = self.publisher.receive_reply(socket, request)
- return reply
- @target_not_found_warn
- def _send(self, request):
- with contextlib.closing(self.publisher.acquire_connection(request)) \
- as socket:
- self.publisher.send_request(socket, request)
- send_cast = _send
- send_fanout = _send
- send_notify = _send
-class PublisherManagerDynamicAsyncMultisend(PublisherManagerDynamic):
- def __init__(self, publisher):
- super(PublisherManagerDynamicAsyncMultisend, self).__init__(
- publisher, with_pool=True
- )
- def _send_async(self, request):
- self.pool.submit(self._send, request)
- send_fanout = _send_async
- send_notify = _send_async
-class PublisherManagerStatic(PublisherManagerBase):
- @target_not_found_timeout
- def send_call(self, request):
- socket = self.publisher.acquire_connection(request)
- self.publisher.send_request(socket, request)
- reply = self.publisher.receive_reply(socket, request)
- return reply
- @target_not_found_warn
- def _send(self, request):
- socket = self.publisher.acquire_connection(request)
- self.publisher.send_request(socket, request)
- send_cast = _send
- send_fanout = _send
- send_notify = _send
-class PublisherManagerStaticAsyncMultisend(PublisherManagerStatic):
- def __init__(self, publisher):
- super(PublisherManagerStaticAsyncMultisend, self).__init__(
- publisher, with_pool=True
- )
- def _send_async(self, request):
- self.pool.submit(self._send, request)
- send_fanout = _send_async
- send_notify = _send_async
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index 7754c24..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,193 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import logging
-import threading
-import futurist
-import six
-from oslo_messaging._drivers.zmq_driver.client import zmq_response
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_version
-from oslo_messaging._i18n import _LE
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-def suppress_errors(func):
- @six.wraps(func)
- def silent_func(self, socket):
- try:
- return func(self, socket)
- except Exception as e:
- LOG.error(_LE("Receiving message failed: %r"), e)
- # NOTE(gdavoian): drop the left parts of a broken message, since
- # they most likely will lead to additional exceptions
- if socket.getsockopt(zmq.RCVMORE):
- socket.recv_multipart()
- return silent_func
-class ReceiverBase(object):
- """Base response receiving interface."""
- def __init__(self, conf):
- self.conf = conf
- self._lock = threading.Lock()
- self._requests = {}
- self._poller = zmq_async.get_poller()
- self._receive_response_versions = \
- zmq_version.get_method_versions(self, 'receive_response')
- self._executor = zmq_async.get_executor(self._run_loop)
- self._executor.execute()
- def register_socket(self, socket):
- """Register a socket for receiving data."""
- self._poller.register(socket, self.receive_response)
- def unregister_socket(self, socket):
- """Unregister a socket from receiving data."""
- self._poller.unregister(socket)
- @abc.abstractmethod
- def receive_response(self, socket):
- """Receive a response (ack or reply) and return it."""
- def track_request(self, request):
- """Track a request via already registered sockets and return
- a pair of ack and reply futures for monitoring all possible
- types of responses for the given request.
- """
- message_id = request.message_id
- futures = self._get_futures(message_id)
- if futures is None:
- ack_future = reply_future = None
- if self.conf.oslo_messaging_zmq.rpc_use_acks:
- ack_future = futurist.Future()
- if request.msg_type == zmq_names.CALL_TYPE:
- reply_future = futurist.Future()
- futures = (ack_future, reply_future)
- self._set_futures(message_id, futures)
- return futures
- def untrack_request(self, request):
- """Untrack a request and stop monitoring any responses."""
- self._pop_futures(request.message_id)
- def stop(self):
- self._poller.close()
- self._executor.stop()
- def _get_futures(self, message_id):
- with self._lock:
- return self._requests.get(message_id)
- def _set_futures(self, message_id, futures):
- with self._lock:
- self._requests[message_id] = futures
- def _pop_futures(self, message_id):
- with self._lock:
- return self._requests.pop(message_id, None)
- def _run_loop(self):
- response, socket = \
- self._poller.poll(self.conf.oslo_messaging_zmq.rpc_poll_timeout)
- if response is None:
- return
- message_type, message_id = response.msg_type, response.message_id
- futures = self._get_futures(message_id)
- if futures is not None:
- ack_future, reply_future = futures
- if message_type == zmq_names.REPLY_TYPE:
- reply_future.set_result(response)
- else:
- ack_future.set_result(response)
- LOG.debug("Received %(msg_type)s for %(msg_id)s",
- {"msg_type": zmq_names.message_type_str(message_type),
- "msg_id": message_id})
- def _get_receive_response_version(self, version):
- receive_response_version = self._receive_response_versions.get(version)
- if receive_response_version is None:
- raise zmq_version.UnsupportedMessageVersionError(version)
- return receive_response_version
-class ReceiverProxy(ReceiverBase):
- @suppress_errors
- def receive_response(self, socket):
- empty = socket.recv()
- assert empty == b'', "Empty delimiter expected!"
- message_version = socket.recv_string()
- assert message_version != b'', "Valid message version expected!"
- receive_response_version = \
- self._get_receive_response_version(message_version)
- return receive_response_version(socket)
- def _receive_response_v_1_0(self, socket):
- reply_id = socket.recv()
- assert reply_id != b'', "Valid reply id expected!"
- message_type = int(socket.recv())
- assert message_type in zmq_names.RESPONSE_TYPES, "Response expected!"
- message_id = socket.recv_string()
- assert message_id != '', "Valid message id expected!"
- if message_type == zmq_names.REPLY_TYPE:
- reply_body, failure = socket.recv_loaded()
- reply = zmq_response.Reply(message_id=message_id,
- reply_id=reply_id,
- reply_body=reply_body,
- failure=failure)
- return reply
- else:
- ack = zmq_response.Ack(message_id=message_id,
- reply_id=reply_id)
- return ack
-class ReceiverDirect(ReceiverBase):
- @suppress_errors
- def receive_response(self, socket):
- empty = socket.recv()
- assert empty == b'', "Empty delimiter expected!"
- message_version = socket.recv_string()
- assert message_version != b'', "Valid message version expected!"
- receive_response_version = \
- self._get_receive_response_version(message_version)
- return receive_response_version(socket)
- def _receive_response_v_1_0(self, socket):
- message_type = int(socket.recv())
- assert message_type in zmq_names.RESPONSE_TYPES, "Response expected!"
- message_id = socket.recv_string()
- assert message_id != '', "Valid message id expected!"
- if message_type == zmq_names.REPLY_TYPE:
- reply_body, failure = socket.recv_loaded()
- reply = zmq_response.Reply(message_id=message_id,
- reply_body=reply_body,
- failure=failure)
- return reply
- else:
- ack = zmq_response.Ack(message_id=message_id)
- return ack
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index d2b3110..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,128 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import logging
-import uuid
-import six
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_version
-from oslo_messaging._i18n import _LE
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class Request(object):
- """Zmq request abstract class
- Represents socket (publisher) independent data object to publish.
- Request object should contain all needed information for a publisher
- to publish it, for instance: message payload, target, timeout
- and retries etc.
- """
- def __init__(self, target, context=None, message=None, retry=None):
- """Construct request object
- :param target: Message destination target
- :type target: oslo_messaging.Target
- :param context: Message context
- :type context: dict
- :param message: Message payload to pass
- :type message: dict
- :param retry: an optional default connection retries configuration
- None or -1 means to retry forever
- 0 means no retry
- N means N retries
- :type retry: int
- """
- if self.msg_type not in zmq_names.REQUEST_TYPES:
- raise RuntimeError("Unknown request type!")
- = target
- self.context = context
- self.message = message
- self.retry = retry
- if not isinstance(retry, int) and retry is not None:
- raise ValueError(
- "retry must be an integer, not {0}".format(type(retry)))
- self.message_id = str(uuid.uuid1())
- @abc.abstractproperty
- def msg_type(self):
- """ZMQ request type"""
- @property
- def message_version(self):
- return zmq_version.MESSAGE_VERSION
-class RpcRequest(Request):
- def __init__(self, *args, **kwargs):
- message = kwargs.get("message")
- if message['method'] is None:
- errmsg = _LE("No method specified for RPC call")
- LOG.error(_LE("No method specified for RPC call"))
- raise KeyError(errmsg)
- super(RpcRequest, self).__init__(*args, **kwargs)
-class CallRequest(RpcRequest):
- msg_type = zmq_names.CALL_TYPE
- def __init__(self, *args, **kwargs):
- self.allowed_remote_exmods = kwargs.pop("allowed_remote_exmods")
- self.timeout = kwargs.pop("timeout")
- if self.timeout is None:
- raise ValueError("Timeout should be specified for a RPC call!")
- elif not isinstance(self.timeout, int):
- raise ValueError(
- "timeout must be an integer, not {0}"
- .format(type(self.timeout)))
- super(CallRequest, self).__init__(*args, **kwargs)
-class CastRequest(RpcRequest):
- msg_type = zmq_names.CAST_TYPE
-class FanoutRequest(RpcRequest):
- msg_type = zmq_names.CAST_FANOUT_TYPE
-class NotificationRequest(Request):
- msg_type = zmq_names.NOTIFY_TYPE
- def __init__(self, *args, **kwargs):
- self.version = kwargs.pop("version")
- super(NotificationRequest, self).__init__(*args, **kwargs)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index 140feed..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,85 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import six
-from oslo_messaging._drivers.zmq_driver import zmq_names
-class Response(object):
- def __init__(self, message_id=None, reply_id=None, message_version=None):
- if self.msg_type not in zmq_names.RESPONSE_TYPES:
- raise RuntimeError("Unknown response type!")
- self._message_id = message_id
- self._reply_id = reply_id
- self._message_version = message_version
- @abc.abstractproperty
- def msg_type(self):
- """ZMQ response type"""
- @property
- def message_id(self):
- return self._message_id
- @property
- def reply_id(self):
- return self._reply_id
- @property
- def message_version(self):
- return self._message_version
- def to_dict(self):
- return {zmq_names.FIELD_MSG_ID: self._message_id,
- zmq_names.FIELD_REPLY_ID: self._reply_id,
- zmq_names.FIELD_MSG_VERSION: self._message_version}
- def __str__(self):
- return str(self.to_dict())
-class Ack(Response):
- msg_type = zmq_names.ACK_TYPE
-class Reply(Response):
- msg_type = zmq_names.REPLY_TYPE
- def __init__(self, message_id=None, reply_id=None, message_version=None,
- reply_body=None, failure=None):
- super(Reply, self).__init__(message_id, reply_id, message_version)
- self._reply_body = reply_body
- self._failure = failure
- @property
- def reply_body(self):
- return self._reply_body
- @property
- def failure(self):
- return self._failure
- def to_dict(self):
- dict_ = super(Reply, self).to_dict()
- dict_.update({zmq_names.FIELD_REPLY_BODY: self._reply_body,
- zmq_names.FIELD_FAILURE: self._failure})
- return dict_
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index 40fc966..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,196 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import itertools
-import logging
-import threading
-import time
-from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._i18n import _LW
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class RoutingTableAdaptor(object):
- def __init__(self, conf, matchmaker, listener_type):
- self.conf = conf
- self.matchmaker = matchmaker
- self.listener_type = listener_type
- self.routing_table = RoutingTable(conf)
- self.routing_table_updater = RoutingTableUpdater(
- conf, matchmaker, self.routing_table)
- self.round_robin_targets = {}
- self._lock = threading.Lock()
- def get_round_robin_host(self, target):
- target_key = self._fetch_round_robin_hosts_from_matchmaker(target)
- rr_gen = self.round_robin_targets[target_key]
- host = next(rr_gen)
- LOG.debug("Host resolved for the current connection is %s" % host)
- return host
- def get_all_round_robin_hosts(self, target):
- target_key = self._fetch_round_robin_hosts_from_matchmaker(target)
- return self.routing_table.get_hosts_fanout(target_key)
- def _fetch_round_robin_hosts_from_matchmaker(self, target):
- target_key = zmq_address.target_to_key(
- target, zmq_names.socket_type_str(self.listener_type))
- LOG.debug("Processing target %s for round-robin." % target_key)
- if target_key not in self.round_robin_targets:
- with self._lock:
- if target_key not in self.round_robin_targets:
- LOG.debug("Target %s is not in cache. Check matchmaker "
- "server." % target_key)
- hosts = self.matchmaker.get_hosts_retry(
- target, zmq_names.socket_type_str(self.listener_type))
- LOG.debug("Received hosts %s" % hosts)
- self.routing_table.update_hosts(target_key, hosts)
- self.round_robin_targets[target_key] = \
- self.routing_table.get_hosts_round_robin(target_key)
- return target_key
- def get_fanout_hosts(self, target):
- target_key = zmq_address.target_to_key(
- target, zmq_names.socket_type_str(self.listener_type))
- LOG.debug("Processing target %s for fanout." % target_key)
- if not self.routing_table.contains(target_key):
- self._fetch_fanout_hosts_from_matchmaker(target, target_key)
- return self.routing_table.get_hosts_fanout(target_key)
- def _fetch_fanout_hosts_from_matchmaker(self, target, target_key):
- with self._lock:
- if not self.routing_table.contains(target_key):
- LOG.debug("Target %s is not in cache. Check matchmaker server."
- % target_key)
- hosts = self.matchmaker.get_hosts_fanout(
- target, zmq_names.socket_type_str(self.listener_type))
- LOG.debug("Received hosts %s" % hosts)
- self.routing_table.update_hosts(target_key, hosts)
- def cleanup(self):
- self.routing_table_updater.cleanup()
-class RoutingTable(object):
- def __init__(self, conf):
- self.conf = conf
- self.targets = {}
- self._lock = threading.Lock()
- def register(self, target_key, host):
- with self._lock:
- if target_key in self.targets:
- hosts, tm = self.targets[target_key]
- if host not in hosts:
- hosts.add(host)
- self.targets[target_key] = (hosts, self._create_tm())
- else:
- self.targets[target_key] = ({host}, self._create_tm())
- def get_targets(self):
- with self._lock:
- return list(self.targets.keys())
- def unregister(self, target_key, host):
- with self._lock:
- hosts, tm = self.targets.get(target_key)
- if hosts and host in hosts:
- hosts.discard(host)
- self.targets[target_key] = (hosts, self._create_tm())
- def update_hosts(self, target_key, hosts_updated):
- with self._lock:
- if target_key in self.targets and not hosts_updated:
- self.targets.pop(target_key)
- return
- hosts_current, _ = self.targets.get(target_key, (set(), None))
- hosts_updated = set(hosts_updated)
- has_differences = hosts_updated ^ hosts_current
- if has_differences:
- self.targets[target_key] = (hosts_updated, self._create_tm())
- def get_hosts_round_robin(self, target_key):
- while self.contains(target_key):
- for host in self._get_hosts_rr(target_key):
- yield host
- def get_hosts_fanout(self, target_key):
- hosts, _ = self._get_hosts(target_key)
- return hosts
- def contains(self, target_key):
- with self._lock:
- return target_key in self.targets
- def _get_hosts(self, target_key):
- with self._lock:
- hosts, tm = self.targets.get(target_key, ([], None))
- hosts = list(hosts)
- return hosts, tm
- def _get_tm(self, target_key):
- with self._lock:
- _, tm = self.targets.get(target_key)
- return tm
- def _is_target_changed(self, target_key, tm_orig):
- return self._get_tm(target_key) != tm_orig
- @staticmethod
- def _create_tm():
- return time.time()
- def _get_hosts_rr(self, target_key):
- hosts, tm_original = self._get_hosts(target_key)
- for host in itertools.cycle(hosts):
- if self._is_target_changed(target_key, tm_original):
- return
- yield host
-class RoutingTableUpdater(zmq_updater.UpdaterBase):
- def __init__(self, conf, matchmaker, routing_table):
- self.routing_table = routing_table
- super(RoutingTableUpdater, self).__init__(
- conf, matchmaker, self._update_routing_table,
- conf.oslo_messaging_zmq.zmq_target_update)
- def _update_routing_table(self):
- target_keys = self.routing_table.get_targets()
- try:
- for target_key in target_keys:
- hosts = self.matchmaker.get_hosts_by_key(target_key)
- self.routing_table.update_hosts(target_key, hosts)
- LOG.debug("Updating routing table from the matchmaker. "
- "%d target(s) updated %s." % (len(target_keys),
- target_keys))
- except zmq_matchmaker_base.MatchmakerUnavailable:
- LOG.warning(_LW("Not updated. Matchmaker was not available."))
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index 6813bf5..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,207 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import logging
-import threading
-import six
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_version
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class SenderBase(object):
- """Base request/response sending interface."""
- def __init__(self, conf, use_async=False):
- self.conf = conf
- self.use_async = use_async
- self._lock = threading.Lock()
- self._send_versions = zmq_version.get_method_versions(self, 'send')
- def _get_send_version(self, version):
- send_version = self._send_versions.get(version)
- if send_version is None:
- raise zmq_version.UnsupportedMessageVersionError(version)
- return send_version
- @abc.abstractmethod
- def send(self, socket, message):
- """Send a message via a socket in a thread-safe manner."""
-class RequestSenderBase(SenderBase):
- pass
-class AckSenderBase(SenderBase):
- pass
-class ReplySenderBase(SenderBase):
- pass
-class RequestSenderProxy(RequestSenderBase):
- def send(self, socket, request):
- assert request.msg_type in zmq_names.REQUEST_TYPES, "Request expected!"
- send_version = self._get_send_version(request.message_version)
- with self._lock:
- send_version(socket, request)
- LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message "
- "%(msg_id)s to target %(target)s (v%(msg_version)s)",
- {"addr": list(socket.connections),
- "msg_type": zmq_names.message_type_str(request.msg_type),
- "msg_id": request.message_id,
- "target":,
- "msg_version": request.message_version})
- def _send_v_1_0(self, socket, request):
- socket.send(b'', zmq.SNDMORE)
- socket.send_string('1.0', zmq.SNDMORE)
- socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
- socket.send(request.routing_key, zmq.SNDMORE)
- socket.send_string(request.message_id, zmq.SNDMORE)
- socket.send_dumped([request.context, request.message])
-class AckSenderProxy(AckSenderBase):
- def send(self, socket, ack):
- assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!"
- send_version = self._get_send_version(ack.message_version)
- with self._lock:
- send_version(socket, ack)
- LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s for %(msg_id)s "
- "(v%(msg_version)s)",
- {"addr": list(socket.connections),
- "msg_type": zmq_names.message_type_str(ack.msg_type),
- "msg_id": ack.message_id,
- "msg_version": ack.message_version})
- def _send_v_1_0(self, socket, ack):
- socket.send(b'', zmq.SNDMORE)
- socket.send_string('1.0', zmq.SNDMORE)
- socket.send(six.b(str(ack.msg_type)), zmq.SNDMORE)
- socket.send(ack.reply_id, zmq.SNDMORE)
- socket.send_string(ack.message_id)
-class ReplySenderProxy(ReplySenderBase):
- def send(self, socket, reply):
- assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
- send_version = self._get_send_version(reply.message_version)
- with self._lock:
- send_version(socket, reply)
- LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s for %(msg_id)s "
- "(v%(msg_version)s)",
- {"addr": list(socket.connections),
- "msg_type": zmq_names.message_type_str(reply.msg_type),
- "msg_id": reply.message_id,
- "msg_version": reply.message_version})
- def _send_v_1_0(self, socket, reply):
- socket.send(b'', zmq.SNDMORE)
- socket.send_string('1.0', zmq.SNDMORE)
- socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE)
- socket.send(reply.reply_id, zmq.SNDMORE)
- socket.send_string(reply.message_id, zmq.SNDMORE)
- socket.send_dumped([reply.reply_body, reply.failure])
-class RequestSenderDirect(RequestSenderBase):
- def send(self, socket, request):
- assert request.msg_type in zmq_names.REQUEST_TYPES, "Request expected!"
- send_version = self._get_send_version(request.message_version)
- with self._lock:
- send_version(socket, request)
- LOG.debug("Sending %(msg_type)s message %(msg_id)s to "
- "target %(target)s (v%(msg_version)s)",
- {"msg_type": zmq_names.message_type_str(request.msg_type),
- "msg_id": request.message_id,
- "target":,
- "msg_version": request.message_version})
- def _send_v_1_0(self, socket, request):
- flags = zmq.NOBLOCK if self.use_async else 0
- socket.send(b'', zmq.SNDMORE | flags)
- socket.send_string('1.0', zmq.SNDMORE | flags)
- socket.send(six.b(str(request.msg_type)), zmq.SNDMORE | flags)
- socket.send_string(request.message_id, zmq.SNDMORE | flags)
- socket.send_dumped([request.context, request.message], flags)
-class AckSenderDirect(AckSenderBase):
- def send(self, socket, ack):
- assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!"
- send_version = self._get_send_version(ack.message_version)
- with self._lock:
- send_version(socket, ack)
- LOG.debug("Sending %(msg_type)s for %(msg_id)s (v%(msg_version)s)",
- {"msg_type": zmq_names.message_type_str(ack.msg_type),
- "msg_id": ack.message_id,
- "msg_version": ack.message_version})
- def _send_v_1_0(self, socket, ack):
- raise NotImplementedError()
-class ReplySenderDirect(ReplySenderBase):
- def send(self, socket, reply):
- assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
- send_version = self._get_send_version(reply.message_version)
- with self._lock:
- send_version(socket, reply)
- LOG.debug("Sending %(msg_type)s for %(msg_id)s (v%(msg_version)s)",
- {"msg_type": zmq_names.message_type_str(reply.msg_type),
- "msg_id": reply.message_id,
- "msg_version": reply.message_version})
- def _send_v_1_0(self, socket, reply):
- socket.send(reply.reply_id, zmq.SNDMORE)
- socket.send(b'', zmq.SNDMORE)
- socket.send_string('1.0', zmq.SNDMORE)
- socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE)
- socket.send_string(reply.message_id, zmq.SNDMORE)
- socket.send_dumped([reply.reply_body, reply.failure])
diff --git a/oslo_messaging/_drivers/zmq_driver/client/ b/oslo_messaging/_drivers/zmq_driver/client/
deleted file mode 100644
index 7ce0b70..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/
+++ /dev/null
@@ -1,85 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_socket
-zmq = zmq_async.import_zmq()
-LOG = logging.getLogger(__name__)
-class SocketsManager(object):
- def __init__(self, conf, matchmaker, socket_type):
- self.conf = conf
- self.matchmaker = matchmaker
- self.socket_type = socket_type
- self.zmq_context = zmq.Context()
- self.socket_to_publishers = None
- self.socket_to_routers = None
- self.sockets = {}
- def get_socket(self, immediate=True):
- return zmq_socket.ZmqSocket(self.conf, self.zmq_context,
- self.socket_type, immediate=immediate)
- def get_cached_socket(self, target_key, hosts=None, immediate=True):
- hosts = [] if hosts is None else hosts
- socket = self.sockets.get(target_key, None)
- if socket is None:
- LOG.debug("CREATING NEW socket for target_key %s " % target_key)
- socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
- self.socket_type,
- immediate=immediate)
- self.sockets[target_key] = socket
- for host in hosts:
- socket.connect_to_host(host)
- LOG.debug("Target key: %s socket:%s" % (target_key,
- socket.handle.identity))
- return socket
- def get_socket_to_publishers(self, identity=None):
- if self.socket_to_publishers is not None:
- return self.socket_to_publishers
- self.socket_to_publishers = zmq_socket.ZmqSocket(
- self.conf, self.zmq_context, self.socket_type,
- immediate=self.conf.oslo_messaging_zmq.zmq_immediate,
- identity=identity)
- publishers = self.matchmaker.get_publishers()
- for pub_address, fe_router_address in publishers:
- self.socket_to_publishers.connect_to_host(fe_router_address)
- return self.socket_to_publishers
- def get_socket_to_routers(self, identity=None):
- if self.socket_to_routers is not None:
- return self.socket_to_routers
- self.socket_to_routers = zmq_socket.ZmqSocket(
- self.conf, self.zmq_context, self.socket_type,
- immediate=self.conf.oslo_messaging_zmq.zmq_immediate,
- identity=identity)
- routers = self.matchmaker.get_routers()
- for be_router_address in routers:
- self.socket_to_routers.connect_to_host(be_router_address)
- return self.socket_to_routers
- def cleanup(self):
- if self.socket_to_publishers:
- self.socket_to_publishers.close()
- if self.socket_to_routers:
- self.socket_to_routers.close()
- for socket in self.sockets.values():
- socket.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/ b/oslo_messaging/_drivers/zmq_driver/matchmaker/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/ b/oslo_messaging/_drivers/zmq_driver/matchmaker/
deleted file mode 100755
index 616e4d4..0000000
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/
+++ /dev/null
@@ -1,291 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import collections
-import logging
-import six
-import time
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LE
-LOG = logging.getLogger(__name__)
-class MatchmakerUnavailable(rpc_common.RPCException):
- """Exception is raised on connection error to matchmaker service"""
- def __init__(self):
- super(MatchmakerUnavailable, self).__init__(
- message=_LE("Matchmaker is not currently available."))
-class MatchmakerBase(object):
- def __init__(self, conf, *args, **kwargs):
- super(MatchmakerBase, self).__init__()
- self.conf = conf
- self.url = kwargs.get('url')
- @abc.abstractmethod
- def register_publisher(self, hostname, expire=-1):
- """Register publisher on nameserver.
- This works for PUB-SUB only
- :param hostname: host for the topic in "host:port" format
- host for back-chatter in "host:port" format
- :type hostname: tuple
- :param expire: record expiration timeout
- :type expire: int
- """
- @abc.abstractmethod
- def unregister_publisher(self, hostname):
- """Unregister publisher on nameserver.
- This works for PUB-SUB only
- :param hostname: host for the topic in "host:port" format
- host for back-chatter in "host:port" format
- :type hostname: tuple
- """
- @abc.abstractmethod
- def get_publishers(self):
- """Get all publisher-hosts from nameserver.
- :returns: a list of tuples of strings "hostname:port" hosts
- """
- @abc.abstractmethod
- def register_router(self, hostname, expire=-1):
- """Register router on the nameserver.
- This works for ROUTER proxy only
- :param hostname: host for the topic in "host:port" format
- :type hostname: str
- :param expire: record expiration timeout
- :type expire: int
- """
- @abc.abstractmethod
- def unregister_router(self, hostname):
- """Unregister router on the nameserver.
- This works for ROUTER proxy only
- :param hostname: host for the topic in "host:port" format
- :type hostname: str
- """
- @abc.abstractmethod
- def get_routers(self):
- """Get all router-hosts from nameserver.
- :returns: a list of strings "hostname:port" hosts
- """
- @abc.abstractmethod
- def register(self, target, hostname, listener_type, expire=-1):
- """Register target on nameserver.
- If record already exists and has expiration timeout it will be
- updated. Existing records without timeout will stay untouched
- :param target: the target for host
- :type target: Target
- :param hostname: host for the topic in "host:port" format
- :type hostname: str
- :param listener_type: listener socket type ROUTER, SUB etc.
- :type listener_type: str
- :param expire: record expiration timeout
- :type expire: int
- """
- @abc.abstractmethod
- def unregister(self, target, hostname, listener_type):
- """Unregister target from nameserver.
- :param target: the target for host
- :type target: Target
- :param hostname: host for the topic in "host:port" format
- :type hostname: str
- :param listener_type: listener socket type ROUTER, SUB etc.
- :type listener_type: str
- """
- @abc.abstractmethod
- def get_hosts(self, target, listener_type):
- """Get all hosts from nameserver by target.
- :param target: the default target for invocations
- :type target: Target
- :param listener_type: listener socket type ROUTER, SUB etc.
- :type listener_type: str
- :returns: a list of "hostname:port" hosts
- """
- @abc.abstractmethod
- def get_hosts_retry(self, target, listener_type):
- """Retry if not hosts - used on client first time connection.
- :param target: the default target for invocations
- :type target: Target
- :param listener_type: listener socket type ROUTER, SUB etc.
- :type listener_type: str
- :returns: a list of "hostname:port" hosts
- """
- @abc.abstractmethod
- def get_hosts_fanout(self, target, listener_type):
- """Get all hosts for fanout from nameserver by target.
- :param target: the default target for invocations
- :type target: Target
- :param listener_type: listener socket type ROUTER, SUB etc.
- :type listener_type: str
- :returns: a list of "hostname:port" hosts
- """
- @abc.abstractmethod
- def get_hosts_fanout_retry(self, target, listener_type):
- """Retry if not host for fanout - used on client first time connection.
- :param target: the default target for invocations
- :type target: Target
- :param listener_type: listener socket type ROUTER, SUB etc.
- :type listener_type: str
- :returns: a list of "hostname:port" hosts
- """
-class MatchmakerDummy(MatchmakerBase):
- def __init__(self, conf, *args, **kwargs):
- super(MatchmakerDummy, self).__init__(conf, *args, **kwargs)
- self._cache = collections.defaultdict(list)
- self._publishers = set()
- self._routers = set()
- self._address = {}
- self.executor = zmq_async.get_executor(method=self._loop)
- self.executor.execute()
- def register_publisher(self, hostname, expire=-1):
- if hostname not in self._publishers:
- self._publishers.add(hostname)
- self._address[hostname] = expire
- def unregister_publisher(self, hostname):
- if hostname in self._publishers:
- self._publishers.remove(hostname)
- if hostname in self._address:
- self._address.pop(hostname)
- def get_publishers(self):
- hosts = [host for host in self._publishers
- if self._address[host] > 0]
- return hosts
- def register_router(self, hostname, expire=-1):
- if hostname not in self._routers:
- self._routers.add(hostname)
- self._address[hostname] = expire
- def unregister_router(self, hostname):
- if hostname in self._routers:
- self._routers.remove(hostname)
- if hostname in self._address:
- self._address.pop(hostname)
- def get_routers(self):
- hosts = [host for host in self._routers
- if self._address[host] > 0]
- return hosts
- def _loop(self):
- for hostname in self._address:
- expire = self._address[hostname]
- if expire > 0:
- self._address[hostname] = expire - 1
- time.sleep(1)
- def register(self, target, hostname, listener_type, expire=-1):
- if target.server:
- key = zmq_address.target_to_key(target, listener_type)
- if hostname not in self._cache[key]:
- self._cache[key].append(hostname)
- key = zmq_address.prefix_str(target.topic, listener_type)
- if hostname not in self._cache[key]:
- self._cache[key].append(hostname)
- self._address[hostname] = expire
- def unregister(self, target, hostname, listener_type):
- if target.server:
- key = zmq_address.target_to_key(target, listener_type)
- if hostname in self._cache[key]:
- self._cache[key].remove(hostname)
- key = zmq_address.prefix_str(target.topic, listener_type)
- if hostname in self._cache[key]:
- self._cache[key].remove(hostname)
- if hostname in self._address:
- self._address.pop(hostname)
- def get_hosts(self, target, listener_type):
- hosts = []
- if target.server:
- key = zmq_address.target_to_key(target, listener_type)
- hosts.extend([host for host in self._cache[key]
- if self._address[host] > 0])
- if not hosts:
- key = zmq_address.prefix_str(target.topic, listener_type)
- hosts.extend([host for host in self._cache[key]
- if self._address[host] > 0])
- LOG.debug("[Dummy] get_hosts for target %(target)s: %(hosts)s",
- {"target": target, "hosts": hosts})
- return hosts
- def get_hosts_retry(self, target, listener_type):
- # Do not complicate dummy matchmaker
- # This method will act smarter in real world matchmakers
- return self.get_hosts(target, listener_type)
- def get_hosts_fanout(self, target, listener_type):
- hosts = []
- key = zmq_address.target_to_key(target, listener_type)
- hosts.extend([host for host in self._cache[key]
- if self._address[host] > 0])
- LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s",
- {"target": target, "hosts": hosts})
- return hosts
- def get_hosts_fanout_retry(self, target, listener_type):
- # Do not complicate dummy matchmaker
- # This method will act smarter in real world matchmakers
- return self.get_hosts_fanout(target, listener_type)
diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/ b/oslo_messaging/_drivers/zmq_driver/matchmaker/
deleted file mode 100644
index 5b066af..0000000
--- a/oslo_messaging/_drivers/zmq_driver/matchmaker/
+++ /dev/null
@@ -1,452 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import functools
-import logging
-import random
-import time
-from oslo_config import cfg
-from oslo_utils import importutils
-import six
-import tenacity
-from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._i18n import _LE, _LI, _LW
-redis = importutils.try_import('redis')
-redis_sentinel = importutils.try_import('redis.sentinel')
-LOG = logging.getLogger(__name__)
-matchmaker_redis_opts = [
- cfg.StrOpt('host',
- default='',
- deprecated_for_removal=True,
- deprecated_reason="Replaced by [DEFAULT]/transport_url",
- help='Host to locate redis.'),
- cfg.PortOpt('port',
- default=6379,
- deprecated_for_removal=True,
- deprecated_reason="Replaced by [DEFAULT]/transport_url",
- help='Use this port to connect to redis host.'),
- cfg.StrOpt('password',
- default='',
- secret=True,
- deprecated_for_removal=True,
- deprecated_reason="Replaced by [DEFAULT]/transport_url",
- help='Password for Redis server (optional).'),
- cfg.ListOpt('sentinel_hosts',
- default=[],
- deprecated_for_removal=True,
- deprecated_reason="Replaced by [DEFAULT]/transport_url",
- help='List of Redis Sentinel hosts (fault tolerance mode), '
- 'e.g., [host:port, host1:port ... ]'),
- cfg.StrOpt('sentinel_group_name',
- default='oslo-messaging-zeromq',
- help='Redis replica set name.'),
- cfg.IntOpt('wait_timeout',
- default=2000,
- help='Time in ms to wait between connection attempts.'),
- cfg.IntOpt('check_timeout',
- default=20000,
- help='Time in ms to wait before the transaction is killed.'),
- cfg.IntOpt('socket_timeout',
- default=10000,
- help='Timeout in ms on blocking socket operations.'),
-def write_to_redis_connection_warn(func):
- @functools.wraps(func)
- def func_wrapper(self, *args, **kwargs):
- # try to perform a write operation to all available hosts
- success = False
- for redis_instance in self._redis_instances:
- if not redis_instance._is_available:
- continue
- try:
- func(self, redis_instance, *args, **kwargs)
- success = True
- except redis.ConnectionError:
- LOG.warning(_LW("Redis host %s is not available now."),
- redis_instance._address)
- redis_instance._is_available = False
- redis_instance._ready_from = float("inf")
- if not success:
- raise zmq_matchmaker_base.MatchmakerUnavailable()
- return func_wrapper
-def read_from_redis_connection_warn(func):
- @functools.wraps(func)
- def func_wrapper(self, *args, **kwargs):
- # try to perform a read operation from any available and ready host
- for redis_instance in self._redis_instances:
- if not redis_instance._is_available \
- or redis_instance._ready_from > time.time():
- continue
- try:
- return func(self, redis_instance, *args, **kwargs)
- except redis.ConnectionError:
- LOG.warning(_LW("Redis host %s is not available now."),
- redis_instance._address)
- redis_instance._is_available = False
- redis_instance._ready_from = float("inf")
- raise zmq_matchmaker_base.MatchmakerUnavailable()
- return func_wrapper
-def no_reraise(func):
- def func_wrapper(*args, **kwargs):
- try:
- return func(*args, **kwargs)
- except zmq_matchmaker_base.MatchmakerUnavailable:
- pass
- return func_wrapper
-def empty_list_on_error(func):
- def func_wrapper(*args, **kwargs):
- try:
- return func(*args, **kwargs)
- except zmq_matchmaker_base.MatchmakerUnavailable:
- return []
- return func_wrapper
-def is_empty(hosts):
- return not hosts
-class MatchmakerRedisBase(zmq_matchmaker_base.MatchmakerBase):
- def __init__(self, conf, *args, **kwargs):
- if redis is None:
- raise ImportError(_LE("Redis package is not available!"))
- super(MatchmakerRedisBase, self).__init__(conf, *args, **kwargs)
- self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
- @abc.abstractmethod
- def _sadd(self, key, value, expire):
- pass
- @abc.abstractmethod
- def _srem(self, key, value):
- pass
- @abc.abstractmethod
- def _smembers(self, key):
- pass
- @abc.abstractmethod
- def _ttl(self, key):
- pass
- @no_reraise
- def register_publisher(self, hostname, expire=-1):
- hostname = ','.join(hostname)
- self._sadd(_PUBLISHERS_KEY, hostname, expire)
- self._sadd(hostname, ' ', expire)
- @no_reraise
- def unregister_publisher(self, hostname):
- hostname = ','.join(hostname)
- self._srem(_PUBLISHERS_KEY, hostname)
- self._srem(hostname, ' ')
- @empty_list_on_error
- def get_publishers(self):
- return [tuple(hostname.split(',')) for hostname
- in self._smembers(_PUBLISHERS_KEY)]
- @no_reraise
- def register_router(self, hostname, expire=-1):
- self._sadd(_ROUTERS_KEY, hostname, expire)
- self._sadd(hostname, ' ', expire)
- @no_reraise
- def unregister_router(self, hostname):
- self._srem(_ROUTERS_KEY, hostname)
- self._srem(hostname, ' ')
- @empty_list_on_error
- def get_routers(self):
- return self._smembers(_ROUTERS_KEY)
- def get_hosts_by_key(self, key):
- return self._smembers(key)
- def register(self, target, hostname, listener_type, expire=-1):
- if target.server:
- key = zmq_address.target_to_key(target, listener_type)
- self._sadd(key, hostname, expire)
- self._sadd(hostname, ' ', expire)
- key = zmq_address.prefix_str(target.topic, listener_type)
- self._sadd(key, hostname, expire)
- self._sadd(hostname, ' ', expire)
- @no_reraise
- def unregister(self, target, hostname, listener_type):
- if target.server:
- key = zmq_address.target_to_key(target, listener_type)
- self._srem(key, hostname)
- self._srem(hostname, ' ')
- key = zmq_address.prefix_str(target.topic, listener_type)
- self._srem(key, hostname)
- self._srem(hostname, ' ')
- def get_hosts(self, target, listener_type):
- hosts = []
- if target.server:
- key = zmq_address.target_to_key(target, listener_type)
- hosts.extend(self._smembers(key))
- else:
- key = zmq_address.prefix_str(target.topic, listener_type)
- hosts.extend(self._smembers(key))
- LOG.debug("[Redis] get_hosts for target %(target)s: %(hosts)s",
- {"target": target, "hosts": hosts})
- return hosts
- def get_hosts_retry(self, target, listener_type):
- return self._retry_method(target, listener_type, self.get_hosts)
- def get_hosts_fanout(self, target, listener_type):
- key = zmq_address.target_to_key(target, listener_type)
- hosts = list(self._smembers(key))
- LOG.debug("[Redis] get_hosts_fanout for target %(target)s: %(hosts)s",
- {"target": target, "hosts": hosts})
- return hosts
- def get_hosts_fanout_retry(self, target, listener_type):
- return self._retry_method(target, listener_type, self.get_hosts_fanout)
- def _retry_method(self, target, listener_type, method):
- wait_timeout = self.conf.matchmaker_redis.wait_timeout / 1000.
- check_timeout = self.conf.matchmaker_redis.check_timeout / 1000.
- @tenacity.retry(retry=tenacity.retry_if_result(is_empty),
- wait=tenacity.wait_fixed(wait_timeout),
- stop=tenacity.stop_after_delay(check_timeout))
- def _get_hosts_retry(target, listener_type):
- return method(target, listener_type)
- return _get_hosts_retry(target, listener_type)
-class MatchmakerRedis(MatchmakerRedisBase):
- def __init__(self, conf, *args, **kwargs):
- super(MatchmakerRedis, self).__init__(conf, *args, **kwargs)
- self._redis_hosts = self._extract_redis_hosts()
- self._redis_instances = [
- redis.StrictRedis(host=redis_host["host"],
- port=redis_host["port"],
- password=redis_host["password"])
- for redis_host in self._redis_hosts
- ]
- for redis_host, redis_instance \
- in, self._redis_instances):
- address = "{host}:{port}".format(host=redis_host["host"],
- port=redis_host["port"])
- redis_instance._address = address
- is_available = self._check_availability(redis_instance)
- if is_available:
- redis_instance._is_available = True
- redis_instance._ready_from = time.time()
- else:
- LOG.warning(_LW("Redis host %s is not available now."),
- address)
- redis_instance._is_available = False
- redis_instance._ready_from = float("inf")
- # NOTE(gdavoian): store instances in a random order
- # (for the sake of load balancing)
- random.shuffle(self._redis_instances)
- self._availability_updater = \
- MatchmakerRedisAvailabilityUpdater(self.conf, self)
- def _extract_redis_hosts(self):
- if self.url and self.url.hosts:
- return [{"host": redis_host.hostname,
- "port": redis_host.port,
- "password": redis_host.password}
- for redis_host in self.url.hosts]
- else:
- # FIXME(gdavoian): remove the code below along with the
- # corresponding deprecated options in the next release
- return [{"host":,
- "port": self.conf.matchmaker_redis.port,
- "password": self.conf.matchmaker_redis.password}]
- @staticmethod
- def _check_availability(redis_instance):
- try:
- return True
- except redis.ConnectionError:
- return False
- @write_to_redis_connection_warn
- def _sadd(self, redis_instance, key, value, expire):
- redis_instance.sadd(key, value)
- if expire > 0:
- redis_instance.expire(key, expire)
- @write_to_redis_connection_warn
- def _srem(self, redis_instance, key, value):
- redis_instance.srem(key, value)
- @read_from_redis_connection_warn
- def _ttl(self, redis_instance, key):
- # NOTE(ozamiatin): If the specialized key doesn't exist,
- # TTL fuction would return -2. If key exists,
- # but doesn't have expiration associated,
- # TTL func would return -1. For more information,
- # please visit
- return redis_instance.ttl(key)
- @read_from_redis_connection_warn
- def _smembers(self, redis_instance, key):
- hosts = redis_instance.smembers(key)
- return [host for host in hosts if redis_instance.ttl(host) >= -1]
-class MatchmakerRedisAvailabilityUpdater(zmq_updater.UpdaterBase):
- def __init__(self, conf, matchmaker):
- super(MatchmakerRedisAvailabilityUpdater, self).__init__(
- conf, matchmaker, self._update_availability,
- sleep_for=conf.oslo_messaging_zmq.zmq_target_update
- )
- def _update_availability(self):
- fraction_of_available_instances = 0
- for redis_instance in self.matchmaker._redis_instances:
- if not redis_instance._is_available:
- is_available = \
- self.matchmaker._check_availability(redis_instance)
- if is_available:
-"Redis host %s is available again."),
- redis_instance._address)
- fraction_of_available_instances += 1
- # NOTE(gdavoian): mark an instance as available for
- # writing to, but wait until all services register
- # themselves in it for making the instance ready for
- # reading from
- redis_instance._is_available = True
- redis_instance._ready_from = time.time() + \
- self.conf.oslo_messaging_zmq.zmq_target_expire
- else:
- fraction_of_available_instances += 1
- fraction_of_available_instances /= \
- float(len(self.matchmaker._redis_instances))
- # NOTE(gdavoian): make the sleep time proportional to the number of
- # currently available instances
- self._sleep_for = max(self.conf.oslo_messaging_zmq.zmq_target_update *
- fraction_of_available_instances,
- self._MIN_SLEEP_FOR)
-class MatchmakerSentinel(MatchmakerRedisBase):
- def __init__(self, conf, *args, **kwargs):
- super(MatchmakerSentinel, self).__init__(conf, *args, **kwargs)
- socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
- self._sentinel_hosts, self._password, self._master_group = \
- self._extract_sentinel_hosts()
- self._sentinel = redis_sentinel.Sentinel(
- sentinels=self._sentinel_hosts,
- socket_timeout=socket_timeout,
- password=self._password)
- self._slave = self._master = None
- @property
- def _redis_master(self):
- try:
- if not self._master:
- self._master = self._sentinel.master_for(self._master_group)
- return self._master
- except redis_sentinel.MasterNotFoundError:
- raise zmq_matchmaker_base.MatchmakerUnavailable()
- @property
- def _redis_slave(self):
- try:
- if not self._slave:
- self._slave = self._sentinel.slave_for(self._master_group)
- except redis_sentinel.SlaveNotFoundError:
- # use the master as slave (temporary)
- return self._redis_master
- return self._slave
- def _extract_sentinel_hosts(self):
- sentinels = []
- master_group = self.conf.matchmaker_redis.sentinel_group_name
- master_password = None
- if self.url and self.url.hosts:
- for host in self.url.hosts:
- target = host.hostname, host.port
- if host.password:
- master_password = host.password
- sentinels.append(target)
- if self.url.virtual_host:
- # url://:pass@sentinel_a,:pass@sentinel_b/master_group_name
- master_group = self.url.virtual_host
- elif self.conf.matchmaker_redis.sentinel_hosts:
- s = self.conf.matchmaker_redis.sentinel_hosts
- sentinels.extend([tuple(target.split(":")) for target in s])
- master_password = self.conf.matchmaker_redis.password
- return sentinels, master_password, master_group
- def _sadd(self, key, value, expire):
- self._redis_master.sadd(key, value)
- if expire > 0:
- self._redis_master.expire(key, expire)
- def _srem(self, key, value):
- self._redis_master.srem(key, value)
- def _smembers(self, key):
- hosts = self._redis_slave.smembers(key)
- return [host for host in hosts if self._ttl(host) >= -1]
- def _ttl(self, key):
- return self._redis_slave.ttl(key)
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/ b/oslo_messaging/_drivers/zmq_driver/poller/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/poller/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/ b/oslo_messaging/_drivers/zmq_driver/poller/
deleted file mode 100644
index fdf9b44..0000000
--- a/oslo_messaging/_drivers/zmq_driver/poller/
+++ /dev/null
@@ -1,82 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import eventlet
-from oslo_messaging._drivers.zmq_driver import zmq_poller
-LOG = logging.getLogger(__name__)
-class GreenPoller(zmq_poller.ZmqPoller):
- def __init__(self):
- self.incoming_queue = eventlet.queue.LightQueue()
- self.thread_by_socket = {}
- def register(self, socket, recv_method=None):
- if socket not in self.thread_by_socket:
- LOG.debug("Registering socket %s", socket.handle.identity)
- self.thread_by_socket[socket] = eventlet.spawn(
- self._socket_receive, socket, recv_method
- )
- def unregister(self, socket):
- thread = self.thread_by_socket.pop(socket, None)
- if thread:
- LOG.debug("Unregistering socket %s", socket.handle.identity)
- thread.kill()
- def _socket_receive(self, socket, recv_method=None):
- while True:
- if recv_method:
- incoming = recv_method(socket)
- else:
- incoming = socket.recv_multipart()
- self.incoming_queue.put((incoming, socket))
- eventlet.sleep()
- def poll(self, timeout=None):
- try:
- return self.incoming_queue.get(timeout=timeout)
- except eventlet.queue.Empty:
- return None, None
- def close(self):
- for thread in self.thread_by_socket.values():
- thread.kill()
- self.thread_by_socket = {}
-class GreenExecutor(zmq_poller.Executor):
- def __init__(self, method):
- self._method = method
- super(GreenExecutor, self).__init__(None)
- def _loop(self):
- while True:
- self._method()
- eventlet.sleep()
- def execute(self):
- if self.thread is None:
- self.thread = eventlet.spawn(self._loop)
- def stop(self):
- if self.thread is not None:
- self.thread.kill()
- self.thread = None
diff --git a/oslo_messaging/_drivers/zmq_driver/poller/ b/oslo_messaging/_drivers/zmq_driver/poller/
deleted file mode 100644
index b150a06..0000000
--- a/oslo_messaging/_drivers/zmq_driver/poller/
+++ /dev/null
@@ -1,88 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import threading
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_poller
-zmq = zmq_async.import_zmq()
-LOG = logging.getLogger(__name__)
-class ThreadingPoller(zmq_poller.ZmqPoller):
- def __init__(self):
- self.poller = zmq.Poller()
- self.sockets_and_recv_methods = {}
- def register(self, socket, recv_method=None):
- socket_handle = socket.handle
- if socket_handle in self.sockets_and_recv_methods:
- return
- LOG.debug("Registering socket %s", socket_handle.identity)
- self.sockets_and_recv_methods[socket_handle] = (socket, recv_method)
- self.poller.register(socket_handle, zmq.POLLIN)
- def unregister(self, socket):
- socket_handle = socket.handle
- socket_and_recv_method = \
- self.sockets_and_recv_methods.pop(socket_handle, None)
- if socket_and_recv_method:
- LOG.debug("Unregistering socket %s", socket_handle.identity)
- self.poller.unregister(socket_handle)
- def poll(self, timeout=None):
- if timeout is not None and timeout > 0:
- timeout *= 1000 # convert seconds to milliseconds
- socket_handles = {}
- try:
- socket_handles = dict(self.poller.poll(timeout=timeout))
- except zmq.ZMQError as e:
- LOG.debug("Polling terminated with error: %s", e)
- if not socket_handles:
- return None, None
- for socket_handle in socket_handles:
- socket, recv_method = self.sockets_and_recv_methods[socket_handle]
- if recv_method:
- return recv_method(socket), socket
- else:
- return socket.recv_multipart(), socket
- def close(self):
- pass # Nothing to do for threading poller
-class ThreadingExecutor(zmq_poller.Executor):
- def __init__(self, method):
- self._method = method
- thread = threading.Thread(target=self._loop)
- thread.daemon = True
- super(ThreadingExecutor, self).__init__(thread)
- self._stop = threading.Event()
- def _loop(self):
- while not self._stop.is_set():
- self._method()
- def execute(self):
- self.thread.start()
- def stop(self):
- self._stop.set()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/ b/oslo_messaging/_drivers/zmq_driver/proxy/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/ b/oslo_messaging/_drivers/zmq_driver/proxy/central/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/central/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/ b/oslo_messaging/_drivers/zmq_driver/proxy/central/
deleted file mode 100644
index bd4937f..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/central/
+++ /dev/null
@@ -1,143 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-from oslo_messaging._drivers.zmq_driver.proxy.central \
- import zmq_publisher_proxy
-from oslo_messaging._drivers.zmq_driver.proxy \
- import zmq_base_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._i18n import _LI
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class SingleRouterProxy(zmq_base_proxy.ProxyBase):
- def __init__(self, conf, context, matchmaker):
- super(SingleRouterProxy, self).__init__(conf, context, matchmaker)
- port = conf.zmq_proxy_opts.frontend_port
- self.fe_router_socket = zmq_base_proxy.create_socket(
- conf, context, port, zmq.ROUTER)
- self.poller.register(self.fe_router_socket, self._receive_message)
- self.publisher = zmq_publisher_proxy.PublisherProxy(conf, matchmaker)
- self.router_sender = zmq_sender.CentralRouterSender()
- self.ack_sender = zmq_sender.CentralAckSender()
- self._router_updater = self._create_router_updater()
- def run(self):
- message, socket = self.poller.poll()
- if message is None:
- return
- message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
- if self.conf.oslo_messaging_zmq.use_pub_sub and \
- message_type in zmq_names.MULTISEND_TYPES:
- self.publisher.send_request(message)
- if socket is self.fe_router_socket and \
- self.conf.zmq_proxy_opts.ack_pub_sub:
- self.ack_sender.send_message(socket, message)
- else:
- self.router_sender.send_message(
- self._get_socket_to_dispatch_on(socket), message)
- def _create_router_updater(self):
- return RouterUpdater(
- self.conf, self.matchmaker,,
- self.fe_router_socket.connect_address,
- self.fe_router_socket.connect_address)
- def _get_socket_to_dispatch_on(self, socket):
- return self.fe_router_socket
- def cleanup(self):
- super(SingleRouterProxy, self).cleanup()
- self._router_updater.cleanup()
- self.fe_router_socket.close()
- self.publisher.cleanup()
-class DoubleRouterProxy(SingleRouterProxy):
- def __init__(self, conf, context, matchmaker):
- port = conf.zmq_proxy_opts.backend_port
- self.be_router_socket = zmq_base_proxy.create_socket(
- conf, context, port, zmq.ROUTER)
- super(DoubleRouterProxy, self).__init__(conf, context, matchmaker)
- self.poller.register(self.be_router_socket, self._receive_message)
- def _create_router_updater(self):
- return RouterUpdater(
- self.conf, self.matchmaker,,
- self.fe_router_socket.connect_address,
- self.be_router_socket.connect_address)
- def _get_socket_to_dispatch_on(self, socket):
- return self.be_router_socket \
- if socket is self.fe_router_socket \
- else self.fe_router_socket
- def cleanup(self):
- super(DoubleRouterProxy, self).cleanup()
- self.be_router_socket.close()
-class RouterUpdater(zmq_updater.UpdaterBase):
- """This entity performs periodic async updates
- from router proxy to the matchmaker.
- """
- def __init__(self, conf, matchmaker, publisher_address, fe_router_address,
- be_router_address):
- self.publisher_address = publisher_address
- self.fe_router_address = fe_router_address
- self.be_router_address = be_router_address
- super(RouterUpdater, self).__init__(
- conf, matchmaker, self._update_records,
- conf.oslo_messaging_zmq.zmq_target_update)
- def _update_records(self):
- self.matchmaker.register_publisher(
- (self.publisher_address, self.fe_router_address),
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
-"[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
- {"pub": self.publisher_address,
- "router": self.fe_router_address})
- self.matchmaker.register_router(
- self.be_router_address,
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
-"[Backend ROUTER:%(router)s] Update ROUTER"),
- {"router": self.be_router_address})
- def cleanup(self):
- super(RouterUpdater, self).cleanup()
- self.matchmaker.unregister_publisher(
- (self.publisher_address, self.fe_router_address))
- self.matchmaker.unregister_router(
- self.be_router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/ b/oslo_messaging/_drivers/zmq_driver/proxy/central/
deleted file mode 100644
index 3076325..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/central/
+++ /dev/null
@@ -1,57 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_socket
-zmq = zmq_async.import_zmq()
-class PublisherProxy(object):
- """PUB/SUB based request publisher
- The publisher intended to be used for Fanout and Notify
- multi-sending patterns.
- It differs from direct publishers like DEALER or PUSH based
- in a way it treats matchmaker. Here all publishers register
- in the matchmaker. Subscribers (server-side) take the list
- of publishers and connect to all of them but subscribe
- only to a specific topic-filtering tag generated from the
- Target object.
- """
- def __init__(self, conf, matchmaker, sender=None):
- super(PublisherProxy, self).__init__()
- self.conf = conf
- self.zmq_context = zmq.Context()
- self.matchmaker = matchmaker
- port = conf.zmq_proxy_opts.publisher_port
- self.socket = zmq_socket.ZmqFixedPortSocket(
- self.conf, self.zmq_context, zmq.PUB,,
- port) if port != 0 else \
- zmq_socket.ZmqRandomPortSocket(
- self.conf, self.zmq_context, zmq.PUB,
- = self.socket.connect_address
- self.sender = sender or zmq_sender.CentralPublisherSender()
- def send_request(self, multipart_message):
- self.sender.send_message(self.socket, multipart_message)
- def cleanup(self):
- self.socket.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/local/ b/oslo_messaging/_drivers/zmq_driver/proxy/local/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/local/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/local/ b/oslo_messaging/_drivers/zmq_driver/proxy/local/
deleted file mode 100644
index 47feae1..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/local/
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from oslo_messaging._drivers.zmq_driver.proxy.central \
- import zmq_publisher_proxy
-from oslo_messaging._drivers.zmq_driver.proxy \
- import zmq_base_proxy
-from oslo_messaging._drivers.zmq_driver.proxy import zmq_sender
-from oslo_messaging._drivers.zmq_driver.server.consumers \
- import zmq_sub_consumer
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_socket
-zmq = zmq_async.import_zmq()
-class LocalPublisherProxy(zmq_base_proxy.ProxyBase):
- def __init__(self, conf, context, matchmaker):
- wrapper = zmq_sub_consumer.SubscriptionMatchmakerWrapper(conf,
- matchmaker)
- super(LocalPublisherProxy, self).__init__(conf, context, wrapper)
- self.fe_sub = zmq_socket.ZmqSocket(conf, context, zmq.SUB, False)
- self.fe_sub.setsockopt(zmq.SUBSCRIBE, b'')
- self.connection_updater = zmq_sub_consumer.SubscriberConnectionUpdater(
- conf, self.matchmaker, self.fe_sub)
- self.poller.register(self.fe_sub, self.receive_message)
- self.publisher = zmq_publisher_proxy.PublisherProxy(
- conf, matchmaker, sender=zmq_sender.LocalPublisherSender())
- def run(self):
- message, socket = self.poller.poll()
- if message is None:
- return
- self.publisher.send_request(message)
- @staticmethod
- def receive_message(socket):
- return socket.recv_multipart()
- def cleanup(self):
- super(LocalPublisherProxy, self).cleanup()
- self.fe_sub.close()
- self.connection_updater.cleanup()
- self.publisher.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/ b/oslo_messaging/_drivers/zmq_driver/proxy/
deleted file mode 100644
index a1600b5..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/
+++ /dev/null
@@ -1,81 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import uuid
-import six
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_socket
-from oslo_messaging._i18n import _LI, _LE
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-def check_message_format(func):
- def _check_message_format(socket):
- try:
- return func(socket)
- except Exception as e:
- LOG.error(_LE("Received message with wrong format: %r. "
- "Dropping invalid message"), e)
- # NOTE(gdavoian): drop the left parts of a broken message, since
- # they most likely will break the order of next messages' parts
- if socket.getsockopt(zmq.RCVMORE):
- socket.recv_multipart()
- return _check_message_format
-def create_socket(conf, context, port, socket_type):
- host =
- identity = six.b(host) + b"/zmq-proxy/" + six.b(str(uuid.uuid4()))
- if port != 0:
- return zmq_socket.ZmqFixedPortSocket(conf, context, socket_type,
- host, port, identity=identity)
- else:
- return zmq_socket.ZmqRandomPortSocket(conf, context, socket_type,
- host, identity=identity)
-class ProxyBase(object):
- def __init__(self, conf, context, matchmaker):
- self.conf = conf
- self.context = context
- self.matchmaker = matchmaker
-"Running %s proxy") % self.PROXY_TYPE)
- self.poller = zmq_async.get_poller()
- @staticmethod
- @check_message_format
- def _receive_message(socket):
- message = socket.recv_multipart()
- assert message[zmq_names.EMPTY_IDX] == b'', "Empty delimiter expected!"
- message_type = int(message[zmq_names.MESSAGE_TYPE_IDX])
- assert message_type in zmq_names.MESSAGE_TYPES, \
- "Known message type expected!"
- assert len(message) > zmq_names.MESSAGE_ID_IDX, \
- "At least %d parts expected!" % (zmq_names.MESSAGE_ID_IDX + 1)
- return message
- def cleanup(self):
- self.poller.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/ b/oslo_messaging/_drivers/zmq_driver/proxy/
deleted file mode 100644
index f69fbe6..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/
+++ /dev/null
@@ -1,201 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import argparse
-import logging
-import socket
-from oslo_config import cfg
-from stevedore import driver
-from oslo_messaging._drivers import impl_zmq
-from oslo_messaging._drivers.zmq_driver.proxy.central import zmq_central_proxy
-from oslo_messaging._drivers.zmq_driver.proxy.local import zmq_local_proxy
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LI
-from oslo_messaging import transport
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-USAGE = """ Usage: ./ [-h] [] ...
-Usage example:
- python oslo_messaging/_cmd/"""
-zmq_proxy_opts = [
- cfg.StrOpt('host', default=socket.gethostname(),
- help='Hostname (FQDN) of current proxy'
- ' an ethernet interface, or IP address.'),
- cfg.IntOpt('frontend_port', default=0,
- help='Front-end ROUTER port number. Zero means random.'),
- cfg.IntOpt('backend_port', default=0,
- help='Back-end ROUTER port number. Zero means random.'),
- cfg.IntOpt('publisher_port', default=0,
- help='Publisher port number. Zero means random.'),
- cfg.BoolOpt('local_publisher', default=False,
- help='Specify publisher/subscriber local proxy.'),
- cfg.BoolOpt('ack_pub_sub', default=False,
- help='Use acknowledgements for notifying senders about '
- 'receiving their fanout messages. '
- 'The option is ignored if PUB/SUB is disabled.'),
- cfg.StrOpt('url', default='zmq://',
- help='ZMQ-driver transport URL with additional configurations')
-def parse_command_line_args(conf):
- parser = argparse.ArgumentParser(
- description='ZeroMQ proxy service',
- usage=USAGE
- )
- parser.add_argument('-c', '--config-file', dest='config_file', type=str,
- help='Path to configuration file')
- parser.add_argument('-l', '--log-file', dest='log_file', type=str,
- help='Path to log file')
- parser.add_argument('-H', '--host', dest='host', type=str,
- help='Host FQDN for current proxy')
- parser.add_argument('-f', '--frontend-port', dest='frontend_port',
- type=int,
- help='Front-end ROUTER port number')
- parser.add_argument('-b', '--backend-port', dest='backend_port', type=int,
- help='Back-end ROUTER port number')
- parser.add_argument('-p', '--publisher-port', dest='publisher_port',
- type=int,
- help='Back-end PUBLISHER port number')
- parser.add_argument('-lp', '--local-publisher', dest='local_publisher',
- action='store_true',
- help='Specify publisher/subscriber local proxy.')
- parser.add_argument('-a', '--ack-pub-sub', dest='ack_pub_sub',
- action='store_true',
- help='Acknowledge PUB/SUB messages')
- parser.add_argument('-u', '--url', dest='url', type=str,
- help='Transport URL with configurations')
- parser.add_argument('-d', '--debug', dest='debug', action='store_true',
- help='Turn on DEBUG logging level instead of INFO')
- args = parser.parse_args()
- if args.config_file:
- conf(['--config-file', args.config_file])
- log_kwargs = {'level': logging.DEBUG if args.debug else logging.INFO,
- 'format': '%(asctime)s %(name)s %(levelname)-8s %(message)s'}
- if args.log_file:
- log_kwargs.update({'filename': args.log_file})
- logging.basicConfig(**log_kwargs)
- if
- conf.set_override('host',, group='zmq_proxy_opts')
- if args.frontend_port:
- conf.set_override('frontend_port', args.frontend_port,
- group='zmq_proxy_opts')
- if args.backend_port:
- conf.set_override('backend_port', args.backend_port,
- group='zmq_proxy_opts')
- if args.publisher_port:
- conf.set_override('publisher_port', args.publisher_port,
- group='zmq_proxy_opts')
- if args.local_publisher:
- conf.set_override('local_publisher', args.local_publisher,
- group='zmq_proxy_opts')
- if args.ack_pub_sub:
- conf.set_override('ack_pub_sub', args.ack_pub_sub,
- group='zmq_proxy_opts')
- if args.url:
- conf.set_override('url', args.url, group='zmq_proxy_opts')
-class ZmqProxy(object):
- """Wrapper class for Publishers and Routers proxies.
- The main reason to have a proxy is high complexity of TCP sockets number
- growth with direct connections (when services connect directly to
- each other). The general complexity for ZeroMQ+Openstack deployment
- with direct connections may be square(N) (where N is a number of nodes
- in deployment). With proxy the complexity is reduced to k*N where
- k is a number of services.
- Currently there are 2 types of proxy, they are Publishers and Routers.
- Publisher proxy serves for PUB-SUB pattern implementation where
- Publisher is a server which performs broadcast to subscribers.
- Router is used for direct message types in case of number of TCP socket
- connections is critical for specific deployment. Generally 3 publishers
- is enough for deployment.
- Router is used for direct messages in order to reduce the number of
- allocated TCP sockets in controller. The list of requirements to Router:
- 1. There may be any number of routers in the deployment. Routers are
- registered in a name-server and client connects dynamically to all of
- them performing load balancing.
- 2. Routers should be transparent for clients and servers. Which means
- it doesn't change the way of messaging between client and the final
- target by hiding the target from a client.
- 3. Router may be restarted or shut down at any time losing all messages
- in its queue. Smart retrying (based on acknowledgements from server
- side) and load balancing between other Router instances from the
- client side should handle the situation.
- 4. Router takes all the routing information from message envelope and
- doesn't perform Target-resolution in any way.
- 5. Routers don't talk to each other and no synchronization is needed.
- 6. Load balancing is performed by the client in a round-robin fashion.
- Those requirements should limit the performance impact caused by using
- of proxies making proxies as lightweight as possible.
- """
- def __init__(self, conf):
- super(ZmqProxy, self).__init__()
- self.conf = conf
- url = transport.TransportURL.parse(
- self.conf, url=self.conf.zmq_proxy_opts.url
- )
- self.matchmaker = driver.DriverManager(
- 'oslo.messaging.zmq.matchmaker',
- impl_zmq.ZmqDriver.get_matchmaker_backend(self.conf, url)
- ).driver(self.conf, url=url)
- self.context = zmq.Context()
- self.proxy = self._choose_proxy_implementation()
- def _choose_proxy_implementation(self):
- if self.conf.zmq_proxy_opts.local_publisher:
- return zmq_local_proxy.LocalPublisherProxy(self.conf, self.context,
- self.matchmaker)
- elif self.conf.zmq_proxy_opts.frontend_port != 0 and \
- self.conf.zmq_proxy_opts.backend_port == 0:
- return zmq_central_proxy.SingleRouterProxy(self.conf, self.context,
- self.matchmaker)
- else:
- return zmq_central_proxy.DoubleRouterProxy(self.conf, self.context,
- self.matchmaker)
- def run(self):
- def close(self):
-"Proxy shutting down ..."))
- self.proxy.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/ b/oslo_messaging/_drivers/zmq_driver/proxy/
deleted file mode 100644
index 0d1952a..0000000
--- a/oslo_messaging/_drivers/zmq_driver/proxy/
+++ /dev/null
@@ -1,147 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import logging
-import six
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_version
-from oslo_messaging._i18n import _LW
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class Sender(object):
- @abc.abstractmethod
- def send_message(self, socket, multipart_message):
- """Send message to a socket from a multipart list."""
-class CentralSender(Sender):
- def __init__(self):
- self._send_message_versions = \
- zmq_version.get_method_versions(self, 'send_message')
- def send_message(self, socket, multipart_message):
- message_version = multipart_message[zmq_names.MESSAGE_VERSION_IDX]
- if six.PY3:
- message_version = message_version.decode('utf-8')
- send_message_version = self._send_message_versions.get(message_version)
- if send_message_version is None:
- LOG.warning(_LW("Dropping message with unsupported version %s"),
- message_version)
- return
- send_message_version(socket, multipart_message)
-class LocalSender(Sender):
- pass
-class CentralRouterSender(CentralSender):
- def _send_message_v_1_0(self, socket, multipart_message):
- message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
- routing_key = multipart_message[zmq_names.ROUTING_KEY_IDX]
- reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
- message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
- message_version = multipart_message[zmq_names.MESSAGE_VERSION_IDX]
- socket.send(routing_key, zmq.SNDMORE)
- socket.send(b'', zmq.SNDMORE)
- socket.send(message_version, zmq.SNDMORE)
- socket.send(reply_id, zmq.SNDMORE)
- socket.send(multipart_message[zmq_names.MESSAGE_TYPE_IDX], zmq.SNDMORE)
- socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
- LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
- "-> to %(rkey)s (v%(msg_version)s)",
- {"msg_type": zmq_names.message_type_str(message_type),
- "msg_id": message_id,
- "rkey": routing_key,
- "rid": reply_id,
- "msg_version": message_version})
-class CentralAckSender(CentralSender):
- def _send_message_v_1_0(self, socket, multipart_message):
- message_type = zmq_names.ACK_TYPE
- message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
- routing_key = socket.handle.identity
- reply_id = multipart_message[zmq_names.REPLY_ID_IDX]
- message_version = multipart_message[zmq_names.MESSAGE_VERSION_IDX]
- socket.send(reply_id, zmq.SNDMORE)
- socket.send(b'', zmq.SNDMORE)
- socket.send(message_version, zmq.SNDMORE)
- socket.send(routing_key, zmq.SNDMORE)
- socket.send(six.b(str(message_type)), zmq.SNDMORE)
- socket.send_string(message_id)
- LOG.debug("Sending %(msg_type)s for %(msg_id)s to %(rid)s "
- "[from %(rkey)s] (v%(msg_version)s)",
- {"msg_type": zmq_names.message_type_str(message_type),
- "msg_id": message_id,
- "rid": reply_id,
- "rkey": routing_key,
- "msg_version": message_version})
-class CentralPublisherSender(CentralSender):
- def _send_message_v_1_0(self, socket, multipart_message):
- message_type = int(multipart_message[zmq_names.MESSAGE_TYPE_IDX])
- assert message_type in zmq_names.MULTISEND_TYPES, "Fanout expected!"
- topic_filter = multipart_message[zmq_names.ROUTING_KEY_IDX]
- message_id = multipart_message[zmq_names.MESSAGE_ID_IDX]
- message_version = multipart_message[zmq_names.MESSAGE_VERSION_IDX]
- socket.send(topic_filter, zmq.SNDMORE)
- socket.send(message_version, zmq.SNDMORE)
- socket.send(six.b(str(message_type)), zmq.SNDMORE)
- socket.send_multipart(multipart_message[zmq_names.MESSAGE_ID_IDX:])
- LOG.debug("Publishing message %(msg_id)s on [%(topic)s] "
- "(v%(msg_version)s)",
- {"topic": topic_filter,
- "msg_id": message_id,
- "msg_version": message_version})
-class LocalPublisherSender(LocalSender):
- MSG_ID_IDX = 3
- def send_message(self, socket, multipart_message):
- socket.send_multipart(multipart_message)
- LOG.debug("Publishing message %(msg_id)s on [%(topic)s] "
- "(v%(msg_version)s)",
- {"topic": multipart_message[self.TOPIC_IDX],
- "msg_id": multipart_message[self.MSG_ID_IDX],
- "msg_version": multipart_message[self.MSG_VERSION_IDX]})
diff --git a/oslo_messaging/_drivers/zmq_driver/server/ b/oslo_messaging/_drivers/zmq_driver/server/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/ b/oslo_messaging/_drivers/zmq_driver/server/consumers/
deleted file mode 100644
index e69de29..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/
+++ /dev/null
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/ b/oslo_messaging/_drivers/zmq_driver/server/consumers/
deleted file mode 100644
index b2e69fc..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/
+++ /dev/null
@@ -1,152 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import logging
-import six
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_socket
-from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._i18n import _LE, _LI, _LW
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class ConsumerBase(object):
- def __init__(self, conf, poller, server):
- self.conf = conf
- self.poller = poller
- self.server = server
- self.sockets = []
- self.context = zmq.Context()
- def stop(self):
- """Stop consumer polling/updating."""
- @abc.abstractmethod
- def receive_request(self, socket):
- """Receive a request via a socket."""
- def cleanup(self):
- for socket in self.sockets:
- if not socket.handle.closed:
- socket.close()
- self.sockets = []
-class SingleSocketConsumer(ConsumerBase):
- def __init__(self, conf, poller, server, socket_type):
- super(SingleSocketConsumer, self).__init__(conf, poller, server)
- self.matchmaker = server.matchmaker
- =
- self.socket_type = socket_type
- = None
- self.socket = self.subscribe_socket(socket_type)
- self.target_updater = TargetUpdater(
- conf, self.matchmaker,,, socket_type)
- def stop(self):
- self.target_updater.stop()
- def subscribe_socket(self, socket_type):
- try:
- socket = zmq_socket.ZmqRandomPortSocket(
- self.conf, self.context, socket_type)
- self.sockets.append(socket)
- LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d",
- {"stype": zmq_names.socket_type_str(socket_type),
- "addr": socket.bind_address,
- "port": socket.port})
- = zmq_address.combine_address(
- self.conf.oslo_messaging_zmq.rpc_zmq_host, socket.port)
- self.poller.register(socket, self.receive_request)
- return socket
- except zmq.ZMQError as e:
- errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
- % (self.port, e)
- LOG.error(_LE("Failed binding to port %(port)d: %(e)s"),
- (self.port, e))
- raise rpc_common.RPCException(errmsg)
- @property
- def address(self):
- return self.socket.bind_address
- @property
- def port(self):
- return self.socket.port
- def cleanup(self):
- self.target_updater.cleanup()
- super(SingleSocketConsumer, self).cleanup()
-class TargetUpdater(zmq_updater.UpdaterBase):
- """This entity performs periodic async updates
- to the matchmaker.
- """
- def __init__(self, conf, matchmaker, target, host, socket_type):
- = target
- = host
- self.socket_type = socket_type
- self.conf = conf
- self.matchmaker = matchmaker
- self._sleep_for = conf.oslo_messaging_zmq.zmq_target_update
- # NOTE(ozamiatin): Update target immediately not waiting
- # for background executor to initialize.
- self._update_target()
- super(TargetUpdater, self).__init__(
- conf, matchmaker, self._update_target,
- conf.oslo_messaging_zmq.zmq_target_update)
- def _update_target(self):
- try:
- self.matchmaker.register(
- zmq_names.socket_type_str(self.socket_type),
- expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
- if self._sleep_for != \
- self.conf.oslo_messaging_zmq.zmq_target_update:
- self._sleep_for = \
- self.conf.oslo_messaging_zmq.zmq_target_update
-"Falling back to the normal update %d sec")
- % self._sleep_for)
- except zmq_matchmaker_base.MatchmakerUnavailable:
- # Update target frequently until first successful update
- # After matchmaker is back update normally as of config
- self._sleep_for = 10
- LOG.warning(_LW("Failed connecting to the Matchmaker, "
- "update each %d sec") % self._sleep_for)
- def stop(self):
- super(TargetUpdater, self).stop()
- self.matchmaker.unregister(
- zmq_names.socket_type_str(self.socket_type))
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/ b/oslo_messaging/_drivers/zmq_driver/server/consumers/
deleted file mode 100644
index 7b77a62..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/
+++ /dev/null
@@ -1,212 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import uuid
-import six
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.client import zmq_response
-from oslo_messaging._drivers.zmq_driver.client import zmq_senders
-from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
-from oslo_messaging._drivers.zmq_driver.server.consumers \
- import zmq_consumer_base
-from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
-from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._drivers.zmq_driver import zmq_version
-from oslo_messaging._i18n import _LE, _LI, _LW
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
- def __init__(self, conf, poller, server):
- self.reply_sender = zmq_senders.ReplySenderProxy(conf)
- self.sockets_manager = zmq_sockets_manager.SocketsManager(
- conf, server.matchmaker, zmq.DEALER)
- = None
- super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER)
- self._receive_request_versions = \
- zmq_version.get_method_versions(self, 'receive_request')
- self.connection_updater = ConsumerConnectionUpdater(
- conf, self.matchmaker, self.socket)
-"[%s] Run DEALER consumer"),
- def _generate_identity(self):
- return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + "/" +
- zmq_address.target_to_key( + "/" +
- str(uuid.uuid4()))
- def subscribe_socket(self, socket_type):
- try:
- socket = self.sockets_manager.get_socket_to_routers(
- self._generate_identity())
- = socket.handle.identity
- self.poller.register(socket, self.receive_request)
- return socket
- except zmq.ZMQError as e:
- LOG.error(_LE("Failed connecting to ROUTER socket %(e)s") % e)
- raise rpc_common.RPCException(str(e))
- def _reply(self, rpc_message, reply, failure):
- if failure is not None:
- failure = rpc_common.serialize_remote_exception(failure)
- reply = zmq_response.Reply(message_id=rpc_message.message_id,
- reply_id=rpc_message.reply_id,
- message_version=rpc_message.message_version,
- reply_body=reply,
- failure=failure)
- self.reply_sender.send(rpc_message.socket, reply)
- return reply
- def _create_message(self, context, message, message_version, reply_id,
- message_id, socket, message_type):
- if message_type == zmq_names.CALL_TYPE:
- message = zmq_incoming_message.ZmqIncomingMessage(
- context, message, message_version=message_version,
- reply_id=reply_id, message_id=message_id,
- socket=socket, reply_method=self._reply
- )
- else:
- message = zmq_incoming_message.ZmqIncomingMessage(context, message)
- LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s "
- "(v%(msg_version)s)",
- {"host":,
- "msg_type": zmq_names.message_type_str(message_type),
- "msg_id": message_id,
- "msg_version": message_version})
- return message
- def _get_receive_request_version(self, version):
- receive_request_version = self._receive_request_versions.get(version)
- if receive_request_version is None:
- raise zmq_version.UnsupportedMessageVersionError(version)
- return receive_request_version
- def receive_request(self, socket):
- try:
- empty = socket.recv()
- assert empty == b'', "Empty delimiter expected!"
- message_version = socket.recv_string()
- assert message_version != b'', "Valid message version expected!"
- receive_request_version = \
- self._get_receive_request_version(message_version)
- return receive_request_version(socket)
- except (zmq.ZMQError, AssertionError, ValueError,
- zmq_version.UnsupportedMessageVersionError) as e:
- LOG.error(_LE("Receiving message failure: %s"), str(e))
- # NOTE(gdavoian): drop the left parts of a broken message
- if socket.getsockopt(zmq.RCVMORE):
- socket.recv_multipart()
- def _receive_request_v_1_0(self, socket):
- reply_id = socket.recv()
- assert reply_id != b'', "Valid reply id expected!"
- message_type = int(socket.recv())
- assert message_type in zmq_names.REQUEST_TYPES, "Request expected!"
- message_id = socket.recv_string()
- assert message_id != '', "Valid message id expected!"
- context, message = socket.recv_loaded()
- return self._create_message(context, message, '1.0', reply_id,
- message_id, socket, message_type)
- def cleanup(self):
-"[%s] Destroy DEALER consumer"),
- self.connection_updater.cleanup()
- super(DealerConsumer, self).cleanup()
-class DealerConsumerWithAcks(DealerConsumer):
- def __init__(self, conf, poller, server):
- super(DealerConsumerWithAcks, self).__init__(conf, poller, server)
- self.ack_sender = zmq_senders.AckSenderProxy(conf)
- self.messages_cache = zmq_ttl_cache.TTLCache(
- ttl=conf.oslo_messaging_zmq.rpc_message_ttl
- )
- def _acknowledge(self, message_version, reply_id, message_id, socket):
- ack = zmq_response.Ack(message_id=message_id,
- reply_id=reply_id,
- message_version=message_version)
- self.ack_sender.send(socket, ack)
- def _reply(self, rpc_message, reply, failure):
- reply = super(DealerConsumerWithAcks, self)._reply(rpc_message,
- reply, failure)
- self.messages_cache.add(rpc_message.message_id, reply)
- return reply
- def _reply_from_cache(self, message_id, socket):
- reply = self.messages_cache.get(message_id)
- if reply is not None:
- self.reply_sender.send(socket, reply)
- def _create_message(self, context, message, message_version, reply_id,
- message_id, socket, message_type):
- # drop a duplicate message
- if message_id in self.messages_cache:
- LOG.warning(
- _LW("[%(host)s] Dropping duplicate %(msg_type)s "
- "message %(msg_id)s"),
- {"host":,
- "msg_type": zmq_names.message_type_str(message_type),
- "msg_id": message_id}
- )
- # NOTE(gdavoian): send yet another ack for the direct
- # message, since the old one might be lost;
- # for the CALL message also try to resend its reply
- # (of course, if it was already obtained and cached).
- if message_type in zmq_names.DIRECT_TYPES:
- self._acknowledge(message_version, reply_id, message_id,
- socket)
- if message_type == zmq_names.CALL_TYPE:
- self._reply_from_cache(message_id, socket)
- return None
- self.messages_cache.add(message_id)
- # NOTE(gdavoian): send an immediate ack, since it may
- # be too late to wait until the message will be
- # dispatched and processed by a RPC server
- if message_type in zmq_names.DIRECT_TYPES:
- self._acknowledge(message_version, reply_id, message_id, socket)
- return super(DealerConsumerWithAcks, self)._create_message(
- context, message, message_version, reply_id,
- message_id, socket, message_type
- )
- def cleanup(self):
- self.messages_cache.cleanup()
- super(DealerConsumerWithAcks, self).cleanup()
-class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
- def _update_connection(self):
- routers = self.matchmaker.get_routers()
- for router_address in routers:
- self.socket.connect_to_host(router_address)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/ b/oslo_messaging/_drivers/zmq_driver/server/consumers/
deleted file mode 100644
index 3395f04..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/
+++ /dev/null
@@ -1,109 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.client import zmq_response
-from oslo_messaging._drivers.zmq_driver.client import zmq_senders
-from oslo_messaging._drivers.zmq_driver.server.consumers \
- import zmq_consumer_base
-from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_version
-from oslo_messaging._i18n import _LE, _LI
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
- def __init__(self, conf, poller, server):
- self.reply_sender = zmq_senders.ReplySenderDirect(conf)
- super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER)
- self._receive_request_versions = \
- zmq_version.get_method_versions(self, 'receive_request')
-"[%s] Run ROUTER consumer"),
- def _reply(self, rpc_message, reply, failure):
- if failure is not None:
- failure = rpc_common.serialize_remote_exception(failure)
- reply = zmq_response.Reply(message_id=rpc_message.message_id,
- reply_id=rpc_message.reply_id,
- message_version=rpc_message.message_version,
- reply_body=reply,
- failure=failure)
- self.reply_sender.send(rpc_message.socket, reply)
- return reply
- def _create_message(self, context, message, message_version, reply_id,
- message_id, socket, message_type):
- if message_type == zmq_names.CALL_TYPE:
- message = zmq_incoming_message.ZmqIncomingMessage(
- context, message, message_version=message_version,
- reply_id=reply_id, message_id=message_id,
- socket=socket, reply_method=self._reply
- )
- else:
- message = zmq_incoming_message.ZmqIncomingMessage(context, message)
- LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s "
- "(v%(msg_version)s)",
- {"host":,
- "msg_type": zmq_names.message_type_str(message_type),
- "msg_id": message_id,
- "msg_version": message_version})
- return message
- def _get_receive_request_version(self, version):
- receive_request_version = self._receive_request_versions.get(version)
- if receive_request_version is None:
- raise zmq_version.UnsupportedMessageVersionError(version)
- return receive_request_version
- def receive_request(self, socket):
- try:
- reply_id = socket.recv()
- assert reply_id != b'', "Valid reply id expected!"
- empty = socket.recv()
- assert empty == b'', "Empty delimiter expected!"
- message_version = socket.recv_string()
- assert message_version != b'', "Valid message version expected!"
- receive_request_version = \
- self._get_receive_request_version(message_version)
- return receive_request_version(reply_id, socket)
- except (zmq.ZMQError, AssertionError, ValueError,
- zmq_version.UnsupportedMessageVersionError) as e:
- LOG.error(_LE("Receiving message failed: %s"), str(e))
- # NOTE(gdavoian): drop the left parts of a broken message
- if socket.getsockopt(zmq.RCVMORE):
- socket.recv_multipart()
- def _receive_request_v_1_0(self, reply_id, socket):
- message_type = int(socket.recv())
- assert message_type in zmq_names.REQUEST_TYPES, "Request expected!"
- message_id = socket.recv_string()
- assert message_id != '', "Valid message id expected!"
- context, message = socket.recv_loaded()
- return self._create_message(context, message, '1.0', reply_id,
- message_id, socket, message_type)
- def cleanup(self):
-"[%s] Destroy ROUTER consumer"),
- super(RouterConsumer, self).cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/ b/oslo_messaging/_drivers/zmq_driver/server/consumers/
deleted file mode 100644
index 2d53489..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/
+++ /dev/null
@@ -1,128 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import uuid
-import six
-from oslo_messaging._drivers.zmq_driver.server.consumers \
- import zmq_consumer_base
-from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_socket
-from oslo_messaging._drivers.zmq_driver import zmq_updater
-from oslo_messaging._drivers.zmq_driver import zmq_version
-from oslo_messaging._i18n import _LE, _LI
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class SubConsumer(zmq_consumer_base.ConsumerBase):
- def __init__(self, conf, poller, server):
- super(SubConsumer, self).__init__(conf, poller, server)
- self.matchmaker = SubscriptionMatchmakerWrapper(conf,
- server.matchmaker)
- =
- self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
- immediate=False,
- identity=self._generate_identity())
- self.sockets.append(self.socket)
- = self.socket.handle.identity
- self._subscribe_to_topic()
- self._receive_request_versions = \
- zmq_version.get_method_versions(self, 'receive_request')
- self.connection_updater = SubscriberConnectionUpdater(
- conf, self.matchmaker, self.socket)
- self.poller.register(self.socket, self.receive_request)
-"[%s] Run SUB consumer"),
- def _generate_identity(self):
- return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + '/') + \
- zmq_address.target_to_subscribe_filter( + \
- six.b('/' + str(uuid.uuid4()))
- def _subscribe_to_topic(self):
- topic_filter = zmq_address.target_to_subscribe_filter(
- self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
- LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
- {"host":, "filter": topic_filter})
- def _get_receive_request_version(self, version):
- receive_request_version = self._receive_request_versions.get(version)
- if receive_request_version is None:
- raise zmq_version.UnsupportedMessageVersionError(version)
- return receive_request_version
- def _receive_request_v_1_0(self, topic_filter, socket):
- message_type = int(socket.recv())
- assert message_type in zmq_names.MULTISEND_TYPES, "Fanout expected!"
- message_id = socket.recv()
- context, message = socket.recv_loaded()
- LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s "
- "(v%(msg_version)s)",
- {'host':,
- 'filter': topic_filter,
- 'msg_id': message_id,
- 'msg_version': '1.0'})
- return context, message
- def receive_request(self, socket):
- try:
- topic_filter = socket.recv()
- message_version = socket.recv_string()
- receive_request_version = \
- self._get_receive_request_version(message_version)
- context, message = receive_request_version(topic_filter, socket)
- return zmq_incoming_message.ZmqIncomingMessage(context, message)
- except (zmq.ZMQError, AssertionError, ValueError,
- zmq_version.UnsupportedMessageVersionError) as e:
- LOG.error(_LE("Receiving message failed: %s"), str(e))
- # NOTE(gdavoian): drop the left parts of a broken message
- if socket.getsockopt(zmq.RCVMORE):
- socket.recv_multipart()
- def cleanup(self):
-"[%s] Destroy SUB consumer"),
- self.connection_updater.cleanup()
- super(SubConsumer, self).cleanup()
-class SubscriptionMatchmakerWrapper(object):
- def __init__(self, conf, matchmaker):
- self.conf = conf
- self.matchmaker = matchmaker
- def get_publishers(self):
- conf_publishers = self.conf.oslo_messaging_zmq.subscribe_on
- LOG.debug("Publishers taken from configuration %s", conf_publishers)
- if conf_publishers:
- return [(publisher, None) for publisher in conf_publishers]
- return self.matchmaker.get_publishers()
-class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater):
- def _update_connection(self):
- publishers = self.matchmaker.get_publishers()
- for publisher_address, router_address in publishers:
- self.socket.connect_to_host(publisher_address)
- LOG.debug("[%s] SUB consumer connected to publishers %s",
- self.socket.handle.identity, publishers)
diff --git a/oslo_messaging/_drivers/zmq_driver/server/ b/oslo_messaging/_drivers/zmq_driver/server/
deleted file mode 100644
index 9810388..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/
+++ /dev/null
@@ -1,41 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from oslo_messaging._drivers import base
-class ZmqIncomingMessage(base.RpcIncomingMessage):
- """Base class for RPC-messages via ZMQ-driver.
- Behaviour of messages is fully defined by consumers
- which produced them from obtained raw data.
- """
- def __init__(self, context, message, **kwargs):
- super(ZmqIncomingMessage, self).__init__(context, message)
- self._reply_method = kwargs.pop('reply_method',
- lambda self, reply, failure: None)
- for key, value in kwargs.items():
- setattr(self, key, value)
- def acknowledge(self):
- """Acknowledge is not supported."""
- def reply(self, reply=None, failure=None):
- self._reply_method(self, reply=reply, failure=failure)
- def requeue(self):
- """Requeue is not supported."""
- def heartbeat(self):
- """Heartbeat is not supported."""
diff --git a/oslo_messaging/_drivers/zmq_driver/server/ b/oslo_messaging/_drivers/zmq_driver/server/
deleted file mode 100644
index f62abf8..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/
+++ /dev/null
@@ -1,126 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import copy
-import logging
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.zmq_driver.server.consumers\
- import zmq_dealer_consumer
-from oslo_messaging._drivers.zmq_driver.server.consumers\
- import zmq_router_consumer
-from oslo_messaging._drivers.zmq_driver.server.consumers\
- import zmq_sub_consumer
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LI
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class ZmqServer(base.PollStyleListener):
- def __init__(self, driver, conf, matchmaker, target, poller=None):
- super(ZmqServer, self).__init__()
- self.driver = driver
- self.conf = conf
- self.matchmaker = matchmaker
- = target
- self.poller = poller or zmq_async.get_poller()
-'[%(host)s] Run server %(target)s'),
- {'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
- 'target':})
- if conf.oslo_messaging_zmq.use_router_proxy:
- self.router_consumer = None
- dealer_consumer_cls = \
- zmq_dealer_consumer.DealerConsumerWithAcks \
- if conf.oslo_messaging_zmq.rpc_use_acks else \
- zmq_dealer_consumer.DealerConsumer
- self.dealer_consumer = dealer_consumer_cls(conf, self.poller, self)
- else:
- self.router_consumer = \
- zmq_router_consumer.RouterConsumer(conf, self.poller, self)
- self.dealer_consumer = None
- self.sub_consumer = \
- zmq_sub_consumer.SubConsumer(conf, self.poller, self) \
- if conf.oslo_messaging_zmq.use_pub_sub else None
- self.consumers = []
- if self.router_consumer is not None:
- self.consumers.append(self.router_consumer)
- if self.dealer_consumer is not None:
- self.consumers.append(self.dealer_consumer)
- if self.sub_consumer is not None:
- self.consumers.append(self.sub_consumer)
- @base.batch_poll_helper
- def poll(self, timeout=None):
- message, socket = self.poller.poll(
- timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
- return message
- def stop(self):
- self.poller.close()
- for consumer in self.consumers:
- consumer.stop()
-'[%(host)s] Stop server %(target)s'),
- {'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
- 'target':})
- def cleanup(self):
- self.poller.close()
- for consumer in self.consumers:
- consumer.cleanup()
-'[%(host)s] Destroy server %(target)s'),
- {'host': self.conf.oslo_messaging_zmq.rpc_zmq_host,
- 'target':})
-class ZmqNotificationServer(base.PollStyleListener):
- def __init__(self, driver, conf, matchmaker, targets_and_priorities):
- super(ZmqNotificationServer, self).__init__()
- self.driver = driver
- self.conf = conf
- self.matchmaker = matchmaker
- self.servers = []
- self.poller = zmq_async.get_poller()
- self._listen(targets_and_priorities)
- def _listen(self, targets_and_priorities):
- for target, priority in targets_and_priorities:
- t = copy.deepcopy(target)
- t.topic = target.topic + '.' + priority
- self.servers.append(ZmqServer(
- self.driver, self.conf, self.matchmaker, t, self.poller))
- @base.batch_poll_helper
- def poll(self, timeout=None):
- message, socket = self.poller.poll(
- timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout)
- return message
- def stop(self):
- for server in self.servers:
- server.stop()
- def cleanup(self):
- for server in self.servers:
- server.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/server/ b/oslo_messaging/_drivers/zmq_driver/server/
deleted file mode 100644
index fb4ddec..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import threading
-import time
-from oslo_messaging._drivers.zmq_driver import zmq_async
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class TTLCache(object):
- _UNDEFINED = object()
- def __init__(self, ttl=None):
- self._lock = threading.Lock()
- self._cache = {}
- self._executor = None
- if not (ttl is None or isinstance(ttl, (int, float))):
- raise ValueError('ttl must be None or a number')
- # no (i.e. infinite) ttl
- if ttl is None or ttl <= 0:
- ttl = float('inf')
- else:
- self._executor = zmq_async.get_executor(self._update_cache)
- self._ttl = ttl
- if self._executor:
- self._executor.execute()
- @staticmethod
- def _is_expired(expiration_time, current_time):
- return expiration_time <= current_time
- def add(self, key, value=None):
- with self._lock:
- expiration_time = time.time() + self._ttl
- self._cache[key] = (value, expiration_time)
- def get(self, key, default=None):
- with self._lock:
- data = self._cache.get(key)
- if data is None:
- return default
- value, expiration_time = data
- if self._is_expired(expiration_time, time.time()):
- del self._cache[key]
- return default
- return value
- def __contains__(self, key):
- return self.get(key, self._UNDEFINED) is not self._UNDEFINED
- def _update_cache(self):
- with self._lock:
- current_time = time.time()
- old_size = len(self._cache)
- self._cache = \
- {key: (value, expiration_time) for
- key, (value, expiration_time) in self._cache.items()
- if not self._is_expired(expiration_time, current_time)}
- new_size = len(self._cache)
- LOG.debug('Updated cache: current size %(new_size)s '
- '(%(size_difference)s records removed)',
- {'new_size': new_size,
- 'size_difference': old_size - new_size})
- time.sleep(self._ttl)
- def cleanup(self):
- if self._executor:
- self._executor.stop()
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index aa32c66..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import six
-def combine_address(host, port):
- return "%s:%s" % (host, port)
-def get_tcp_direct_address(host):
- return "tcp://%s" % str(host)
-def get_tcp_random_address(conf):
- return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address
-def prefix_str(key, listener_type):
- return listener_type + "/" + key
-def target_to_key(target, listener_type=None):
- key = target.topic
- if target.server and not target.fanout:
- # FIXME(ozamiatin): Workaround for Cinder.
- # Remove split when Bug #1630975 is being fixed.
- key += "/" + target.server.split('@')[0]
- return prefix_str(key, listener_type) if listener_type else key
-def target_to_subscribe_filter(target):
- return six.b(target.topic)
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index 93135da..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from oslo_utils import eventletutils
-from oslo_utils import importutils
-def import_zmq():
- imported_zmq = importutils.try_import(
- '' if eventletutils.is_monkey_patched('thread') else
- 'zmq', default=None
- )
- return imported_zmq
-def get_poller():
- if eventletutils.is_monkey_patched('thread'):
- from oslo_messaging._drivers.zmq_driver.poller import green_poller
- return green_poller.GreenPoller()
- from oslo_messaging._drivers.zmq_driver.poller import threading_poller
- return threading_poller.ThreadingPoller()
-def get_executor(method):
- if eventletutils.is_monkey_patched('thread'):
- from oslo_messaging._drivers.zmq_driver.poller import green_poller
- return green_poller.GreenExecutor(method)
- from oslo_messaging._drivers.zmq_driver.poller import threading_poller
- return threading_poller.ThreadingExecutor(method)
-def get_pool(size):
- import futurist
- if eventletutils.is_monkey_patched('thread'):
- return futurist.GreenThreadPoolExecutor(size)
- return futurist.ThreadPoolExecutor(size)
-def get_queue():
- if eventletutils.is_monkey_patched('thread'):
- import eventlet
- return eventlet.queue.Queue(), eventlet.queue.Empty
- import six
- return six.moves.queue.Queue(), six.moves.queue.Empty
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index 3e345df..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2015-2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-from oslo_messaging._drivers.zmq_driver import zmq_async
-zmq = zmq_async.import_zmq()
-FIELD_MSG_ID = 'message_id'
-FIELD_REPLY_ID = 'reply_id'
-FIELD_MSG_VERSION = 'message_version'
-FIELD_REPLY_BODY = 'reply_body'
-FIELD_FAILURE = 'failure'
-def socket_type_str(socket_type):
- zmq_socket_str = {zmq.DEALER: "DEALER",
- zmq.PUSH: "PUSH",
- zmq.PULL: "PULL",
- zmq.REQ: "REQ",
- zmq.REP: "REP",
- zmq.PUB: "PUB",
- zmq.SUB: "SUB"}
- return zmq_socket_str[socket_type]
-def message_type_str(message_type):
- msg_type_str = {CALL_TYPE: "CALL",
- return msg_type_str.get(message_type, "UNKNOWN")
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index 4265704..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
@@ -1,213 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import socket
-from oslo_config import cfg
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers import common
-from oslo_messaging import server
-MATCHMAKER_BACKENDS = ('redis', 'sentinel', 'dummy')
-zmq_opts = [
- cfg.StrOpt('rpc_zmq_bind_address', default='*',
- deprecated_group='DEFAULT',
- help='ZeroMQ bind address. Should be a wildcard (*), '
- 'an ethernet interface, or IP. '
- 'The "host" option should point or resolve to this '
- 'address.'),
- cfg.StrOpt('rpc_zmq_matchmaker', default=MATCHMAKER_DEFAULT,
- deprecated_group='DEFAULT',
- help='MatchMaker driver.'),
- cfg.IntOpt('rpc_zmq_contexts', default=1,
- deprecated_group='DEFAULT',
- help='Number of ZeroMQ contexts, defaults to 1.'),
- cfg.IntOpt('rpc_zmq_topic_backlog',
- deprecated_group='DEFAULT',
- help='Maximum number of ingress messages to locally buffer '
- 'per topic. Default is unlimited.'),
- cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
- deprecated_group='DEFAULT',
- help='Directory for holding IPC sockets.'),
- cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
- sample_default='localhost',
- deprecated_group='DEFAULT',
- help='Name of this node. Must be a valid hostname, FQDN, or '
- 'IP address. Must match "host" option, if running Nova.'),
- cfg.IntOpt('zmq_linger', default=-1,
- deprecated_group='DEFAULT',
- deprecated_name='rpc_cast_timeout',
- help='Number of seconds to wait before all pending '
- 'messages will be sent after closing a socket. '
- 'The default value of -1 specifies an infinite linger '
- 'period. The value of 0 specifies no linger period. '
- 'Pending messages shall be discarded immediately '
- 'when the socket is closed. Positive values specify an '
- 'upper bound for the linger period.'),
- cfg.IntOpt('rpc_poll_timeout', default=1,
- deprecated_group='DEFAULT',
- help='The default number of seconds that poll should wait. '
- 'Poll raises timeout exception when timeout expired.'),
- cfg.IntOpt('zmq_target_expire', default=300,
- deprecated_group='DEFAULT',
- help='Expiration timeout in seconds of a name service record '
- 'about existing target ( < 0 means no timeout).'),
- cfg.IntOpt('zmq_target_update', default=180,
- deprecated_group='DEFAULT',
- help='Update period in seconds of a name service record '
- 'about existing target.'),
- cfg.BoolOpt('use_pub_sub', default=False,
- deprecated_group='DEFAULT',
- help='Use PUB/SUB pattern for fanout methods. '
- 'PUB/SUB always uses proxy.'),
- cfg.BoolOpt('use_router_proxy', default=False,
- deprecated_group='DEFAULT',
- help='Use ROUTER remote proxy.'),
- cfg.BoolOpt('use_dynamic_connections', default=False,
- help='This option makes direct connections dynamic or static. '
- 'It makes sense only with use_router_proxy=False which '
- 'means to use direct connections for direct message '
- 'types (ignored otherwise).'),
- cfg.IntOpt('zmq_failover_connections', default=2,
- help='How many additional connections to a host will be made '
- 'for failover reasons. This option is actual only in '
- 'dynamic connections mode.'),
- cfg.PortOpt('rpc_zmq_min_port',
- default=49153,
- deprecated_group='DEFAULT',
- help='Minimal port number for random ports range.'),
- cfg.IntOpt('rpc_zmq_max_port',
- min=1,
- max=65536,
- default=65536,
- deprecated_group='DEFAULT',
- help='Maximal port number for random ports range.'),
- cfg.IntOpt('rpc_zmq_bind_port_retries',
- default=100,
- deprecated_group='DEFAULT',
- help='Number of retries to find free port number before '
- 'fail with ZMQBindError.'),
- cfg.StrOpt('rpc_zmq_serialization', default='json',
- choices=('json', 'msgpack'),
- deprecated_group='DEFAULT',
- help='Default serialization mechanism for '
- 'serializing/deserializing outgoing/incoming messages'),
- cfg.BoolOpt('zmq_immediate', default=True,
- help='This option configures round-robin mode in zmq socket. '
- 'True means not keeping a queue when server side '
- 'disconnects. False means to keep queue and messages '
- 'even if server is disconnected, when the server '
- 'appears we send all accumulated messages to it.'),
- cfg.IntOpt('zmq_tcp_keepalive', default=-1,
- help='Enable/disable TCP keepalive (KA) mechanism. '
- 'The default value of -1 (or any other negative value) '
- 'means to skip any overrides and leave it to OS default; '
- '0 and 1 (or any other positive value) mean to '
- 'disable and enable the option respectively.'),
- cfg.IntOpt('zmq_tcp_keepalive_idle', default=-1,
- help='The duration between two keepalive transmissions in '
- 'idle condition. '
- 'The unit is platform dependent, for example, '
- 'seconds in Linux, milliseconds in Windows etc. '
- 'The default value of -1 (or any other negative value '
- 'and 0) means to skip any overrides and leave it '
- 'to OS default.'),
- cfg.IntOpt('zmq_tcp_keepalive_cnt', default=-1,
- help='The number of retransmissions to be carried out before '
- 'declaring that remote end is not available. '
- 'The default value of -1 (or any other negative value '
- 'and 0) means to skip any overrides and leave it '
- 'to OS default.'),
- cfg.IntOpt('zmq_tcp_keepalive_intvl', default=-1,
- help='The duration between two successive keepalive '
- 'retransmissions, if acknowledgement to the previous '
- 'keepalive transmission is not received. '
- 'The unit is platform dependent, for example, '
- 'seconds in Linux, milliseconds in Windows etc. '
- 'The default value of -1 (or any other negative value '
- 'and 0) means to skip any overrides and leave it '
- 'to OS default.'),
- cfg.IntOpt('rpc_thread_pool_size', default=100,
- help='Maximum number of (green) threads to work concurrently.'),
- cfg.IntOpt('rpc_message_ttl', default=300,
- help='Expiration timeout in seconds of a sent/received message '
- 'after which it is not tracked anymore by a '
- 'client/server.'),
- cfg.BoolOpt('rpc_use_acks', default=False,
- help='Wait for message acknowledgements from receivers. '
- 'This mechanism works only via proxy without PUB/SUB.'),
- cfg.IntOpt('rpc_ack_timeout_base', default=15,
- help='Number of seconds to wait for an ack from a cast/call. '
- 'After each retry attempt this timeout is multiplied by '
- 'some specified multiplier.'),
- cfg.IntOpt('rpc_ack_timeout_multiplier', default=2,
- help='Number to multiply base ack timeout by after each retry '
- 'attempt.'),
- cfg.IntOpt('rpc_retry_attempts', default=3,
- help='Default number of message sending attempts in case '
- 'of any problems occurred: positive value N means '
- 'at most N retries, 0 means no retries, None or -1 '
- '(or any other negative values) mean to retry forever. '
- 'This option is used only if acknowledgments are '
- 'enabled.'),
- cfg.ListOpt('subscribe_on',
- default=[],
- help='List of publisher hosts SubConsumer can subscribe on. '
- 'This option has higher priority then the default '
- 'publishers list taken from the matchmaker.'),
-def register_opts(conf, url):
- opt_group = cfg.OptGroup(name='oslo_messaging_zmq',
- title='ZeroMQ driver options')
- conf.register_opts(zmq_opts, group=opt_group)
- conf.register_opts(server._pool_opts)
- conf.register_opts(base.base_opts)
- return common.ConfigOptsProxy(conf, url,
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index 124a3a7..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import six
-class ZmqPoller(object):
- """Base poller interface
- Needed to poll on zmq sockets in green and native async manner.
- Native poller implementation wraps zmq.Poller helper class.
- Wrapping is needed to provide unified poller interface
- in zmq-driver (for both native and zmq pollers). It makes some
- difference with poller-helper from zmq library which doesn't actually
- receive message.
- The poller object should be obtained over:
- poller = zmq_async.get_poller()
- Then we have to register sockets for polling. We are able
- to provide specific receiving method. By default poller calls
- socket.recv_multipart.
- def receive_message(socket):
- id = socket.recv_string()
- ctxt = socket.recv_json()
- msg = socket.recv_json()
- return (id, ctxt, msg)
- poller.register(socket, recv_method=receive_message)
- Further to receive a message we should call:
- message, socket = poller.poll()
- The 'message' here contains (id, ctxt, msg) tuple.
- """
- @abc.abstractmethod
- def register(self, socket, recv_method=None):
- """Register socket to poll
- :param socket: Socket to subscribe for polling
- :type socket: ZmqSocket
- :param recv_method: Optional specific receiver procedure
- Should return received message object
- :type recv_method: callable
- """
- @abc.abstractmethod
- def unregister(self, socket):
- """Unregister socket from poll
- :param socket: Socket to unsubscribe from polling
- :type socket: ZmqSocket
- """
- @abc.abstractmethod
- def poll(self, timeout=None):
- """Poll for messages
- :param timeout: Optional polling timeout
- None or -1 means poll forever
- any positive value means timeout in seconds
- :type timeout: int
- :returns: (message, socket) tuple
- """
- @abc.abstractmethod
- def close(self):
- """Terminate polling"""
-class Executor(object):
- """Base executor interface for threading/green async executors"""
- def __init__(self, thread):
- self.thread = thread
- @abc.abstractmethod
- def execute(self):
- """Run execution"""
- @abc.abstractmethod
- def stop(self):
- """Stop execution"""
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index c9c7cea..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
@@ -1,260 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import logging
-import uuid
-import six
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver import zmq_address
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._i18n import _LE
-from oslo_messaging import exceptions
-from oslo_serialization.serializer import json_serializer
-from oslo_serialization.serializer import msgpack_serializer
-LOG = logging.getLogger(__name__)
-zmq = zmq_async.import_zmq()
-class ZmqSocket(object):
- 'json': json_serializer.JSONSerializer(),
- 'msgpack': msgpack_serializer.MessagePackSerializer()
- }
- def __init__(self, conf, context, socket_type, immediate,
- high_watermark=0, identity=None):
- self.conf = conf
- self.context = context
- self.socket_type = socket_type
- self.handle = context.socket(socket_type)
- self.handle.set_hwm(high_watermark)
- # Set linger period
- linger = -1
- if self.conf.oslo_messaging_zmq.zmq_linger >= 0:
- # Convert seconds to milliseconds
- linger = self.conf.oslo_messaging_zmq.zmq_linger * 1000
- self.handle.setsockopt(zmq.LINGER, linger)
- # Put messages to only connected queues
- self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
- # Setup timeout on socket sending
- if hasattr(self.conf, 'rpc_response_timeout'):
- self.handle.setsockopt(zmq.SNDTIMEO,
- self.conf.rpc_response_timeout * 1000)
- # Configure TCP keep alive
- keepalive = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive
- if keepalive < 0:
- keepalive = -1
- elif keepalive > 0:
- keepalive = 1
- self.handle.setsockopt(zmq.TCP_KEEPALIVE, keepalive)
- keepalive_idle = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive_idle
- if keepalive_idle <= 0:
- keepalive_idle = -1
- self.handle.setsockopt(zmq.TCP_KEEPALIVE_IDLE, keepalive_idle)
- keepalive_cnt = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive_cnt
- if keepalive_cnt <= 0:
- keepalive_cnt = -1
- self.handle.setsockopt(zmq.TCP_KEEPALIVE_CNT, keepalive_cnt)
- keepalive_intvl = self.conf.oslo_messaging_zmq.zmq_tcp_keepalive_intvl
- if keepalive_intvl <= 0:
- keepalive_intvl = -1
- self.handle.setsockopt(zmq.TCP_KEEPALIVE_INTVL, keepalive_intvl)
- self.handle.identity = \
- six.b(str(uuid.uuid4())) if identity is None else identity
- self.connections = set()
- def _get_serializer(self, serialization):
- serializer = self.SERIALIZERS.get(serialization, None)
- if serializer is None:
- raise NotImplementedError(
- "Serialization '{}' is not supported".format(serialization)
- )
- return serializer
- def type_name(self):
- return zmq_names.socket_type_str(self.socket_type)
- def connections_count(self):
- return len(self.connections)
- def connect(self, address):
- if address not in self.connections:
- self.handle.connect(address)
- self.connections.add(address)
- def setsockopt(self, *args, **kwargs):
- self.handle.setsockopt(*args, **kwargs)
- def setsockopt_string(self, *args, **kwargs):
- self.handle.setsockopt_string(*args, **kwargs)
- def getsockopt(self, *args, **kwargs):
- return self.handle.getsockopt(*args, **kwargs)
- def getsockopt_string(self, *args, **kwargs):
- return self.handle.getsockopt_string(*args, **kwargs)
- def send(self, *args, **kwargs):
- self.handle.send(*args, **kwargs)
- def send_string(self, u, *args, **kwargs):
- # NOTE(ozamiatin): Not using send_string until
- # eventlet zmq support this convenience method
- # in thread-safe manner
- encoding = kwargs.pop('encoding', 'utf-8')
- s = u.encode(encoding) if isinstance(u, six.text_type) else u
- self.handle.send(s, *args, **kwargs)
- def send_json(self, *args, **kwargs):
- self.handle.send_json(*args, **kwargs)
- def send_pyobj(self, *args, **kwargs):
- self.handle.send_pyobj(*args, **kwargs)
- def send_multipart(self, *args, **kwargs):
- self.handle.send_multipart(*args, **kwargs)
- def send_dumped(self, obj, *args, **kwargs):
- serialization = kwargs.pop(
- 'serialization',
- self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
- serializer = self._get_serializer(serialization)
- s = serializer.dump_as_bytes(obj)
- self.handle.send(s, *args, **kwargs)
- def recv(self, *args, **kwargs):
- return self.handle.recv(*args, **kwargs)
- def recv_string(self, *args, **kwargs):
- # NOTE(ozamiatin): Not using recv_string until
- # eventlet zmq support this convenience method
- # in thread-safe manner
- encoding = kwargs.pop('encoding', 'utf-8')
- s = self.handle.recv(*args, **kwargs)
- u = s.decode(encoding) if isinstance(s, six.binary_type) else s
- return u
- def recv_json(self, *args, **kwargs):
- return self.handle.recv_json(*args, **kwargs)
- def recv_pyobj(self, *args, **kwargs):
- return self.handle.recv_pyobj(*args, **kwargs)
- def recv_multipart(self, *args, **kwargs):
- return self.handle.recv_multipart(*args, **kwargs)
- def recv_loaded(self, *args, **kwargs):
- serialization = kwargs.pop(
- 'serialization',
- self.conf.oslo_messaging_zmq.rpc_zmq_serialization)
- serializer = self._get_serializer(serialization)
- s = self.handle.recv(*args, **kwargs)
- obj = serializer.load_from_bytes(s)
- return obj
- def close(self, *args, **kwargs):
- identity = self.handle.identity
- self.handle.close(*args, **kwargs)
- LOG.debug("Socket %s closed" % identity)
- def connect_to_address(self, address):
- if address in self.connections:
- return
- stype = zmq_names.socket_type_str(self.socket_type)
- sid = self.handle.identity
- try:
- LOG.debug("Connecting %(stype)s socket %(sid)s to %(address)s",
- {"stype": stype, "sid": sid, "address": address})
- self.connect(address)
- except zmq.ZMQError as e:
- LOG.error(_LE("Failed connecting %(stype)s-%(sid)s to "
- "%(address)s: %(e)s"),
- {"stype": stype, "sid": sid, "address": address, "e": e})
- raise rpc_common.RPCException(
- "Failed connecting %(stype)s-%(sid)s to %(address)s: %(e)s" %
- {"stype": stype, "sid": sid, "address": address, "e": e})
- def connect_to_host(self, host):
- address = zmq_address.get_tcp_direct_address(
- host.decode('utf-8') if six.PY3 and
- isinstance(host, six.binary_type) else host
- )
- self.connect_to_address(address)
-class ZmqPortBusy(exceptions.MessagingException):
- """Raised when binding to a port failure"""
- def __init__(self, port_number):
- super(ZmqPortBusy, self).__init__()
- self.port_number = port_number
-class ZmqRandomPortSocket(ZmqSocket):
- def __init__(self, conf, context, socket_type, host=None,
- high_watermark=0, identity=None):
- super(ZmqRandomPortSocket, self).__init__(
- conf, context, socket_type, immediate=False,
- high_watermark=high_watermark, identity=identity)
- self.bind_address = zmq_address.get_tcp_random_address(self.conf)
- if host is None:
- host = conf.oslo_messaging_zmq.rpc_zmq_host
- try:
- self.port = self.handle.bind_to_random_port(
- self.bind_address,
- min_port=conf.oslo_messaging_zmq.rpc_zmq_min_port,
- max_port=conf.oslo_messaging_zmq.rpc_zmq_max_port,
- max_tries=conf.oslo_messaging_zmq.rpc_zmq_bind_port_retries)
- self.connect_address = zmq_address.combine_address(host, self.port)
- except zmq.ZMQBindError:
- LOG.error(_LE("Random ports range exceeded!"))
- raise ZmqPortBusy(port_number=0)
-class ZmqFixedPortSocket(ZmqSocket):
- def __init__(self, conf, context, socket_type, host, port,
- high_watermark=0, identity=None):
- super(ZmqFixedPortSocket, self).__init__(
- conf, context, socket_type, immediate=False,
- high_watermark=high_watermark, identity=identity)
- self.connect_address = zmq_address.combine_address(host, port)
- self.bind_address = zmq_address.get_tcp_direct_address(
- zmq_address.combine_address(
- conf.oslo_messaging_zmq.rpc_zmq_bind_address, port))
- = host
- self.port = port
- try:
- self.handle.bind(self.bind_address)
- except zmq.ZMQError as e:
- LOG.exception(e)
- LOG.error(_LE("Chosen port %d is being busy.") % self.port)
- raise ZmqPortBusy(port_number=port)
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index 2201eaf..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import abc
-import time
-import six
-from oslo_messaging._drivers.zmq_driver import zmq_async
-zmq = zmq_async.import_zmq()
-class UpdaterBase(object):
- def __init__(self, conf, matchmaker, update_method, sleep_for):
- self.conf = conf
- self.matchmaker = matchmaker
- self.update_method = update_method
- self._sleep_for = sleep_for
- self.executor = zmq_async.get_executor(method=self._update_loop)
- self.executor.execute()
- def stop(self):
- self.executor.stop()
- def _update_loop(self):
- self.update_method()
- time.sleep(self._sleep_for)
- def cleanup(self):
- self.executor.stop()
-class ConnectionUpdater(UpdaterBase):
- def __init__(self, conf, matchmaker, socket):
- self.socket = socket
- super(ConnectionUpdater, self).__init__(
- conf, matchmaker, self._update_connection,
- conf.oslo_messaging_zmq.zmq_target_update)
- @abc.abstractmethod
- def _update_connection(self):
- """Update connection info"""
diff --git a/oslo_messaging/_drivers/zmq_driver/ b/oslo_messaging/_drivers/zmq_driver/
deleted file mode 100644
index 92baf71..0000000
--- a/oslo_messaging/_drivers/zmq_driver/
+++ /dev/null
@@ -1,60 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import re
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._i18n import _
-# current driver's version for representing internal message format
-class UnsupportedMessageVersionError(rpc_common.RPCException):
- msg_fmt = _("Message version %(version)s is not supported.")
- def __init__(self, version):
- super(UnsupportedMessageVersionError, self).__init__(version=version)
-def get_method_versions(obj, method_name):
- """Useful function for initializing versioned senders/receivers.
- Returns a dictionary of different internal versions of the given method.
- Assumes that the object has the particular versioned method and this method
- is public. Thus versions are private implementations of the method.
- For example, for a method 'func' methods '_func_v_1_0', '_func_v_1_5',
- '_func_v_2_0', etc. are assumed as its respective 1.0, 1.5, 2.0 versions.
- """
- assert callable(getattr(obj, method_name, None)), \
- "Object must have specified method!"
- assert not method_name.startswith('_'), "Method must be public!"
- method_versions = {}
- for attr_name in dir(obj):
- if attr_name == method_name:
- continue
- attr = getattr(obj, attr_name, None)
- if not callable(attr):
- continue
- match_obj = re.match(r'^_%s_v_(\d)_(\d)$' % method_name, attr_name)
- if match_obj is not None:
- version = '.'.join([,])
- method_versions[version] = attr
- return method_versions
diff --git a/oslo_messaging/ b/oslo_messaging/
index 2f75e9a..297694a 100644
--- a/oslo_messaging/
+++ b/oslo_messaging/
@@ -57,14 +57,6 @@ class ConfFixture(fixtures.Fixture):
'amqp1_opts', 'oslo_messaging_amqp')
- _import_opts(self.conf,
- 'oslo_messaging._drivers.zmq_driver.zmq_options',
- 'zmq_opts', 'oslo_messaging_zmq')
- _import_opts(self.conf,
- 'oslo_messaging._drivers.zmq_driver.'
- 'matchmaker.zmq_matchmaker_redis',
- 'matchmaker_redis_opts',
- 'matchmaker_redis')
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')
_import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts')
diff --git a/oslo_messaging/ b/oslo_messaging/
index 3181ae4..325aa08 100644
--- a/oslo_messaging/
+++ b/oslo_messaging/
@@ -24,9 +24,7 @@ from oslo_messaging._drivers import amqp
from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_rabbit
-from oslo_messaging._drivers.impl_zmq import zmq_options
from oslo_messaging._drivers.kafka_driver import kafka_options
-from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
from oslo_messaging.notify import notifier
from oslo_messaging.rpc import client
from oslo_messaging import server
@@ -35,7 +33,6 @@ from oslo_messaging import transport
_global_opt_lists = [
- zmq_options.zmq_opts,
@@ -43,8 +40,6 @@ _global_opt_lists = [
_opts = [
(None, list(itertools.chain(*_global_opt_lists))),
- ('matchmaker_redis', zmq_matchmaker_redis.matchmaker_redis_opts),
- ('oslo_messaging_zmq', zmq_options.zmq_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(
diff --git a/oslo_messaging/tests/functional/ b/oslo_messaging/tests/functional/
index 69e26bd..6e13577 100644
--- a/oslo_messaging/tests/functional/
+++ b/oslo_messaging/tests/functional/
@@ -376,8 +376,6 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
def test_multiple_servers(self):
if self.url.startswith("amqp:"):
- if self.url.startswith("zmq"):
- self.skipTest("ZeroMQ-PUB-SUB")
if self.url.startswith("kafka"):
self.skipTest("Kafka: Need to be fixed")
diff --git a/oslo_messaging/tests/functional/ b/oslo_messaging/tests/functional/
index 82bdbd9..fcebba8 100644
--- a/oslo_messaging/tests/functional/
+++ b/oslo_messaging/tests/functional/
@@ -21,7 +21,6 @@ from six import moves
import oslo_messaging
from oslo_messaging._drivers.kafka_driver import kafka_options
-from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging.notify import notifier
from oslo_messaging.tests import utils as test_utils
@@ -312,34 +311,6 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
transport_url = oslo_messaging.TransportURL.parse(conf, self.url)
- zmq_options.register_opts(conf, transport_url)
- zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER')
- if zmq_matchmaker:
- self.config(rpc_zmq_matchmaker=zmq_matchmaker,
- group="oslo_messaging_zmq")
- zmq_ipc_dir = os.environ.get('ZMQ_IPC_DIR')
- if zmq_ipc_dir:
- self.config(group="oslo_messaging_zmq",
- rpc_zmq_ipc_dir=zmq_ipc_dir)
- zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
- if zmq_redis_port:
- self.config(port=zmq_redis_port,
- check_timeout=10000,
- wait_timeout=1000,
- group="matchmaker_redis")
- zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
- zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
- zmq_use_acks = os.environ.get('ZMQ_USE_ACKS')
- self.config(use_pub_sub=zmq_use_pub_sub,
- use_router_proxy=zmq_use_router_proxy,
- rpc_use_acks=zmq_use_acks,
- group='oslo_messaging_zmq')
- zmq_use_dynamic_connections = \
- self.config(use_dynamic_connections=zmq_use_dynamic_connections,
- group='oslo_messaging_zmq')
kafka_options.register_opts(conf, transport_url)
diff --git a/oslo_messaging/tests/notify/ b/oslo_messaging/tests/notify/
index 37ce82e..b5cad25 100644
--- a/oslo_messaging/tests/notify/
+++ b/oslo_messaging/tests/notify/
@@ -51,8 +51,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
# NOTE(jamespage) disable thread information logging for testing
- # as this causes test failures when zmq tests monkey_patch via
- # eventlet
+ # as this can cause test failures when monkey_patch via eventlet
logging.logThreads = 0
diff --git a/oslo_messaging/tests/ b/oslo_messaging/tests/
index 6e10c51..1c2ff71 100644
--- a/oslo_messaging/tests/
+++ b/oslo_messaging/tests/
@@ -28,12 +28,10 @@ from oslo_messaging.tests import utils as test_utils
class OptsTestCase(test_utils.BaseTestCase):
def _test_list_opts(self, result):
- self.assertEqual(7, len(result))
+ self.assertEqual(5, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
- self.assertIn('matchmaker_redis', groups)
- self.assertIn('oslo_messaging_zmq', groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_notifications', groups)
self.assertIn('oslo_messaging_rabbit', groups)
diff --git a/oslo_messaging/ b/oslo_messaging/
index 267f389..6372e0a 100644
--- a/oslo_messaging/
+++ b/oslo_messaging/
@@ -58,7 +58,7 @@ _transport_opts = [
deprecated_reason="Replaced by [DEFAULT]/transport_url",
help='The messaging driver to use, defaults to rabbit. Other '
- 'drivers include amqp and zmq.'),
+ 'drivers include amqp.'),
diff --git a/playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml b/playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml
deleted file mode 100644
index dac8753..0000000
--- a/playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml
+++ /dev/null
@@ -1,80 +0,0 @@
-- hosts: primary
- tasks:
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*nose_results.html
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*testr_results.html.gz
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/.testrepository/tmp*
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=**/*testrepository.subunit.gz
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}/tox'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/.tox/*/log/*
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
- - name: Copy files from {{ ansible_user_dir }}/workspace/ on node
- synchronize:
- src: '{{ ansible_user_dir }}/workspace/'
- dest: '{{ zuul.executor.log_root }}'
- mode: pull
- copy_links: true
- verify_host: true
- rsync_opts:
- - --include=/logs/**
- - --include=*/
- - --exclude=*
- - --prune-empty-dirs
diff --git a/playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml b/playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml
deleted file mode 100644
index 73b4150..0000000
--- a/playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml
+++ /dev/null
@@ -1,78 +0,0 @@
-- hosts: all
- name: Autoconverted job legacy-oslo.messaging-telemetry-dsvm-integration-zmq from
- old job gate-oslo.messaging-telemetry-dsvm-integration-zmq-ubuntu-xenial-nv
- tasks:
- - name: Ensure legacy workspace directory
- file:
- path: '{{ ansible_user_dir }}/workspace'
- state: directory
- - shell:
- cmd: |
- set -e
- set -x
- cat > clonemap.yaml << EOF
- clonemap:
- - name: openstack-infra/devstack-gate
- dest: devstack-gate
- /usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
- git:// \
- openstack-infra/devstack-gate
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
- - shell:
- cmd: |
- set -e
- set -x
- export PROJECTS="openstack/ceilometer $PROJECTS"
- export PROJECTS="openstack/aodh $PROJECTS"
- export PROJECTS="openstack/devstack-plugin-zmq $PROJECTS"
- case "$ZUUL_BRANCH" in
- "stable/ocata")
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin gnocchi git://"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://"
- export PROJECTS="openstack/panko $PROJECTS openstack/gnocchi"
- ;;
- *)
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://"
- export PROJECTS="openstack/panko $PROJECTS"
- ;;
- esac
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin ceilometer git://"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin aodh git://"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin heat git://"
- export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin devstack-plugin-zmq git://"
- export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
- function post_test_hook {
- cd /opt/stack/new/ceilometer/ceilometer/tests/integration/hooks/
- ./
- }
- export -f post_test_hook
- cp devstack-gate/ ./
- ./
- executable: /bin/bash
- chdir: '{{ ansible_user_dir }}/workspace'
- environment: '{{ zuul | zuul_legacy_vars }}'
diff --git a/releasenotes/notes/remove-ZeroMQ-driver-e9e0bbbb7bd4f5e6.yaml b/releasenotes/notes/remove-ZeroMQ-driver-e9e0bbbb7bd4f5e6.yaml
new file mode 100644
index 0000000..b65d613
--- /dev/null
+++ b/releasenotes/notes/remove-ZeroMQ-driver-e9e0bbbb7bd4f5e6.yaml
@@ -0,0 +1,8 @@
+prelude: >
+ The ZMQ-based driver for RPC communications has been removed
+ - |
+ The driver support for the ZeroMQ messaging library is removed.
+ Users of the oslo.messaging RPC services must use the supported
+ rabbit ("rabbit://...") or amqp1 ("amqp://..." )drivers.
diff --git a/requirements.txt b/requirements.txt
index ef8ecab..0d0353b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -29,9 +29,5 @@ PyYAML>=3.12 # MIT
amqp>=2.3.0 # BSD
kombu!=4.0.2,>=4.0.0 # BSD
-# used by zmq driver
-futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD
-tenacity>=4.4.0 # Apache-2.0
# middleware
oslo.middleware>=3.31.0 # Apache-2.0
diff --git a/ b/
deleted file mode 100755
index ba7ccd1..0000000
--- a/
+++ /dev/null
@@ -1,32 +0,0 @@
-set -e
-. tools/
-trap "clean_exit $DATADIR" EXIT
-export ZMQ_MATCHMAKER=redis
-export ZMQ_REDIS_PORT=65123
-export ZMQ_USE_PUB_SUB=false
-export ZMQ_USE_ROUTER_PROXY=false
-export ZMQ_USE_ACKS=false
-cat > ${DATADIR}/zmq.conf <<EOF
-redis-server --port $ZMQ_REDIS_PORT &
-oslo-messaging-zmq-proxy --debug --url ${TRANSPORT_URL} --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-proxy.log 2>&1 &
diff --git a/ b/
deleted file mode 100755
index 59e8efb..0000000
--- a/
+++ /dev/null
@@ -1,36 +0,0 @@
-set -e
-. tools/
-trap "clean_exit $DATADIR" EXIT
-export ZMQ_MATCHMAKER=redis
-export ZMQ_REDIS_PORT=65123
-export ZMQ_USE_PUB_SUB=false
-export ZMQ_USE_ACKS=false
-cat > ${DATADIR}/zmq.conf <<EOF
-redis-server --port $ZMQ_REDIS_PORT &
-oslo-messaging-zmq-proxy --debug --url ${TRANSPORT_URL} --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-proxy.log 2>&1 &
diff --git a/ b/
deleted file mode 100755
index 187076d..0000000
--- a/
+++ /dev/null
@@ -1,36 +0,0 @@
-set -e
-. tools/
-trap "clean_exit $DATADIR" EXIT
-export ZMQ_MATCHMAKER=redis
-export ZMQ_REDIS_PORT=65123
-export ZMQ_USE_PUB_SUB=true
-export ZMQ_USE_ACKS=false
-cat > ${DATADIR}/zmq.conf <<EOF
-redis-server --port $ZMQ_REDIS_PORT &
-oslo-messaging-zmq-proxy --debug --url ${TRANSPORT_URL} --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-proxy.log 2>&1 &
diff --git a/ b/
deleted file mode 100755
index eae4733..0000000
--- a/
+++ /dev/null
@@ -1,30 +0,0 @@
-set -e
-. tools/
-trap "clean_exit $DATADIR" EXIT
-export ZMQ_MATCHMAKER=redis
-export ZMQ_REDIS_PORT=65123
-export ZMQ_USE_PUB_SUB=false
-export ZMQ_USE_ROUTER_PROXY=false
-export ZMQ_USE_ACKS=false
-cat > ${DATADIR}/zmq.conf <<EOF
-redis-server --port $ZMQ_REDIS_PORT &
diff --git a/setup.cfg b/setup.cfg
index 8d20043..dd74269 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -22,10 +22,6 @@ classifier =
# package dependencies for optional (non-rabbitmq) messaging drivers.
# projects can test-depend on oslo.messaging[<drivers>]
# e.g.: oslo.messaging[kafka,amqp1]
-zmq =
- pyzmq>=14.3.1 # LGPL+BSD
- tenacity>=4.4.0 # Apache-2.0
- redis>=2.10.0 # MIT
amqp1 =
pyngus>=2.2.0 # Apache-2.0
kafka =
@@ -38,13 +34,10 @@ packages =
console_scripts =
- oslo-messaging-zmq-proxy = oslo_messaging._cmd.zmq_proxy:main
- oslo-messaging-zmq-broker = oslo_messaging._cmd.zmq_proxy:main
oslo-messaging-send-notification = oslo_messaging.notify.notifier:_send_notification
oslo.messaging.drivers =
rabbit = oslo_messaging._drivers.impl_rabbit:RabbitDriver
- zmq = oslo_messaging._drivers.impl_zmq:ZmqDriver
amqp = oslo_messaging._drivers.impl_amqp1:ProtonDriver
# This driver is supporting for only notification usage
@@ -69,12 +62,6 @@ oslo.messaging.notify.drivers =
noop = oslo_messaging.notify._impl_noop:NoOpDriver
routing = oslo_messaging.notify._impl_routing:RoutingDriver
-oslo.messaging.zmq.matchmaker =
- # Matchmakers for ZeroMQ
- dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy
- redis = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerRedis
- sentinel = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_redis:MatchmakerSentinel
oslo.config.opts =
oslo.messaging = oslo_messaging.opts:list_opts
diff --git a/test-requirements.txt b/test-requirements.txt
index 9cedd5b..22636cb 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -12,13 +12,9 @@ testscenarios>=0.4 # Apache-2.0/BSD
testtools>=2.2.0 # MIT
oslotest>=3.2.0 # Apache-2.0
pifpaf>=0.10.0 # Apache-2.0
-# for test_matchmaker_redis
-redis>=2.10.0 # MIT
-# for test_impl_zmq
-pyzmq>=14.3.1 # LGPL+BSD
# for test_impl_kafka
+tenacity>=4.4.0 # Apache-2.0
kafka-python>=1.3.1 # Apache-2.0
# when we can require tox>= 1.4, this can go into tox.ini:
diff --git a/tox.ini b/tox.ini
index 9d07185..126eb0f 100644
--- a/tox.ini
+++ b/tox.ini
@@ -20,7 +20,7 @@ basepython = python3
commands =
# run security linter
- bandit -r oslo_messaging -x tests,_drivers/zmq_driver,_drivers/ -n5
+ bandit -r oslo_messaging -x tests -n5
basepython = python3
@@ -86,33 +86,12 @@ setenv =
commands = {toxinidir}/ stestr run --slowest {posargs:oslo_messaging.tests.functional}
-basepython = python2.7
-commands = {toxinidir}/ stestr run --slowest {posargs:oslo_messaging.tests.functional}
-basepython = python3.5
-commands = {toxinidir}/ stestr run --slowest {posargs:oslo_messaging.tests.functional}
-basepython = python2.7
-commands = {toxinidir}/ stestr run --slowest {posargs:oslo_messaging.tests.functional}
-basepython = python2.7
-commands = {toxinidir}/ stestr run --slowest {posargs:oslo_messaging.tests.functional}
-basepython = python2.7
-commands = {toxinidir}/ stestr run --slowest {posargs:oslo_messaging.tests.functional}
# NOTE(kgiusti): This is required for the integration test job of the bandit
# project. Please do not remove.
basepython = python3
-# skip ZeroMQ - it is deprecated
-commands = bandit -r oslo_messaging -x tests,_drivers/zmq_driver,_drivers/ -n5
+commands = bandit -r oslo_messaging -x tests -n5
show-source = True