summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGevorg Davoian <gdavoian@mirantis.com>2016-06-22 19:09:32 +0300
committerGevorg Davoian <gdavoian@mirantis.com>2016-07-07 13:45:39 +0300
commitac484f6b26c6509549edc1150673915b48482ac2 (patch)
treefa1e30d3c443f40e5f1233ea91bd2689b7993da4
parent58ad758dc21e2ebbd259e23a39cd7c9efea5f00f (diff)
downloadoslo-messaging-ac484f6b26c6509549edc1150673915b48482ac2.tar.gz
[zmq] Refactor publishers
This patch refactors publishers by separating responsibilities and introducing senders and waiters within publishers. Change-Id: I90df59d61af2b40b516a5151c67c184fcc91e366 Co-Authored-By: Oleksii Zamiatin <ozamiatin@mirantis.com>
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py106
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py122
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py191
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py66
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py131
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py52
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_client.py68
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py140
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py65
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py94
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py96
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py68
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py69
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py42
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py22
-rw-r--r--oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py51
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_names.py8
-rw-r--r--oslo_messaging/_drivers/zmq_driver/zmq_updater.py2
-rw-r--r--oslo_messaging/tests/functional/utils.py2
19 files changed, 635 insertions, 760 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py
deleted file mode 100644
index 7d1cdf1..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from concurrent import futures
-import futurist
-
-import oslo_messaging
-from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_reply_waiter
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_publisher_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LE
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class DealerCallPublisher(object):
- """Thread-safe CALL publisher
-
- Used as faster and thread-safe publisher for CALL
- instead of ReqPublisher.
- """
-
- def __init__(self, conf, matchmaker, sockets_manager, sender=None,
- reply_waiter=None):
- super(DealerCallPublisher, self).__init__()
- self.conf = conf
- self.matchmaker = matchmaker
- self.reply_waiter = reply_waiter or zmq_reply_waiter.ReplyWaiter(conf)
- self.sockets_manager = sockets_manager
- self.sender = sender or CallSender(self.sockets_manager,
- self.reply_waiter)
-
- def send_request(self, request):
- reply_future = self.sender.send_request(request)
- try:
- reply = reply_future.result(timeout=request.timeout)
- LOG.debug("Received reply %s", request.message_id)
- except AssertionError:
- LOG.error(_LE("Message format error in reply %s"),
- request.message_id)
- return None
- except futures.TimeoutError:
- raise oslo_messaging.MessagingTimeout(
- "Timeout %(tout)s seconds was reached for message %(id)s" %
- {"tout": request.timeout,
- "id": request.message_id})
- finally:
- self.reply_waiter.untrack_id(request.message_id)
-
- if reply.failure:
- raise rpc_common.deserialize_remote_exception(
- reply.failure,
- request.allowed_remote_exmods)
- else:
- return reply.reply_body
-
- def cleanup(self):
- self.reply_waiter.cleanup()
- self.sender.cleanup()
-
-
-class CallSender(zmq_publisher_base.QueuedSender):
-
- def __init__(self, sockets_manager, reply_waiter):
- super(CallSender, self).__init__(sockets_manager,
- self._do_send_request)
- assert reply_waiter, "Valid ReplyWaiter expected!"
- self.reply_waiter = reply_waiter
-
- def _do_send_request(self, socket, request):
- # DEALER socket specific envelope empty delimiter
- socket.send(b'', zmq.SNDMORE)
- socket.send_pyobj(request)
-
- LOG.debug("Sent message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
- def send_request(self, request):
- reply_future = futurist.Future()
- self.reply_waiter.track_reply(reply_future, request.message_id)
- self.queue.put(request)
- return reply_future
-
- def _connect_socket(self, target):
- socket = self.outbound_sockets.get_socket(target)
- self.reply_waiter.poll_socket(socket)
- return socket
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py
index 5934519..89031ec 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py
@@ -12,78 +12,98 @@
# License for the specific language governing permissions and limitations
# under the License.
+from concurrent import futures
import logging
-from oslo_messaging._drivers.zmq_driver.client.publishers\
+import retrying
+
+import oslo_messaging
+from oslo_messaging._drivers import common as rpc_common
+from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class DealerPublisher(zmq_publisher_base.QueuedSender):
-
- def __init__(self, conf, matchmaker):
-
- def _send_message_data(socket, request):
- socket.send(b'', zmq.SNDMORE)
- socket.send_pyobj(request)
-
- LOG.debug("Sent message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
- def _do_send_request(socket, request):
- if request.msg_type in zmq_names.MULTISEND_TYPES:
- for _ in range(socket.connections_count()):
- _send_message_data(socket, request)
- else:
- _send_message_data(socket, request)
-
- sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
- super(DealerPublisher, self).__init__(sockets_manager,
- _do_send_request)
+class DealerPublisher(zmq_publisher_base.PublisherBase):
+ """Non-CALL publisher using direct connections."""
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
- super(DealerPublisher, self).send_request(request)
+ raise zmq_publisher_base.UnsupportedSendPattern(
+ zmq_names.message_type_str(request.msg_type)
+ )
+
+ try:
+ socket = self.sockets_manager.get_socket(request.target)
+ except retrying.RetryError:
+ return
+
+ if request.msg_type in zmq_names.MULTISEND_TYPES:
+ for _ in range(socket.connections_count()):
+ self.sender.send(socket, request)
+ else:
+ self.sender.send(socket, request)
-class DealerPublisherAsync(object):
- """This simplified publisher is to be used with eventlet only.
- Eventlet takes care about zmq sockets sharing between green threads
- using queued lock.
- Use DealerPublisher for other concurrency models.
- """
+class DealerCallPublisher(zmq_publisher_base.PublisherBase):
+ """CALL publisher using direct connections."""
- def __init__(self, conf, matchmaker):
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
+ def __init__(self, sockets_manager, sender, reply_receiver):
+ super(DealerCallPublisher, self).__init__(sockets_manager, sender)
+ self.reply_receiver = reply_receiver
@staticmethod
- def _send_message_data(socket, request):
- socket.send(b'', zmq.SNDMORE)
- socket.send_pyobj(request)
-
- LOG.debug("Sent message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
+ def _raise_timeout(request):
+ raise oslo_messaging.MessagingTimeout(
+ "Timeout %(tout)s seconds was reached for message %(msg_id)s" %
+ {"tout": request.timeout, "msg_id": request.message_id}
+ )
def send_request(self, request):
- if request.msg_type == zmq_names.CALL_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
- socket = self.sockets_manager.get_socket(request.target)
-
- if request.msg_type in zmq_names.MULTISEND_TYPES:
- for _ in range(socket.connections_count()):
- self._send_message_data(socket, request)
+ if request.msg_type != zmq_names.CALL_TYPE:
+ raise zmq_publisher_base.UnsupportedSendPattern(
+ zmq_names.message_type_str(request.msg_type)
+ )
+
+ try:
+ socket = self._connect_socket(request.target)
+ except retrying.RetryError:
+ self._raise_timeout(request)
+
+ self.sender.send(socket, request)
+ self.reply_receiver.register_socket(socket)
+ return self._recv_reply(request)
+
+ def _connect_socket(self, target):
+ return self.sockets_manager.get_socket(target)
+
+ def _recv_reply(self, request):
+ reply_future, = self.reply_receiver.track_request(request)
+
+ try:
+ _, reply = reply_future.result(timeout=request.timeout)
+ except AssertionError:
+ LOG.error(_LE("Message format error in reply for %s"),
+ request.message_id)
+ return None
+ except futures.TimeoutError:
+ self._raise_timeout(request)
+ finally:
+ self.reply_receiver.untrack_request(request)
+
+ if reply.failure:
+ raise rpc_common.deserialize_remote_exception(
+ reply.failure, request.allowed_remote_exmods
+ )
else:
- self._send_message_data(socket, request)
+ return reply.reply_body
def cleanup(self):
- self.sockets_manager.cleanup()
+ self.reply_receiver.stop()
+ super(DealerCallPublisher, self).cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index d3e120d..9f53bed 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -13,171 +13,90 @@
# under the License.
import logging
-import six
-import time
+
+import retrying
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_dealer_call_publisher
-from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_reply_waiter
+ import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
-zmq = zmq_async.import_zmq()
-
LOG = logging.getLogger(__name__)
+zmq = zmq_async.import_zmq()
-class DealerPublisherProxy(object):
- """Used when publishing to a proxy. """
- def __init__(self, conf, matchmaker, socket_to_proxy):
- self.conf = conf
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
- self.socket = socket_to_proxy
- self.routing_table = RoutingTable(conf, matchmaker)
- self.connection_updater = PublisherConnectionUpdater(
- conf, matchmaker, self.socket)
+class DealerPublisherProxy(zmq_publisher_base.PublisherBase):
+ """Non-CALL publisher via proxy."""
+
+ def __init__(self, sockets_manager, sender):
+ super(DealerPublisherProxy, self).__init__(sockets_manager, sender)
+ self.socket = sockets_manager.get_socket_to_publishers()
+ self.routing_table = zmq_routing_table.RoutingTable(self.conf,
+ self.matchmaker)
+ self.connection_updater = \
+ PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
+
+ def _get_routing_keys(self, request):
+ try:
+ if request.msg_type in zmq_names.DIRECT_TYPES:
+ return [self.routing_table.get_routable_host(request.target)]
+ else:
+ return \
+ [zmq_address.target_to_subscribe_filter(request.target)] \
+ if self.conf.use_pub_sub else \
+ self.routing_table.get_all_hosts(request.target)
+ except retrying.RetryError:
+ return []
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
- request.msg_type)
-
- if self.conf.use_pub_sub:
- routing_key = self.routing_table.get_routable_host(request.target) \
- if request.msg_type in zmq_names.DIRECT_TYPES else \
- zmq_address.target_to_subscribe_filter(request.target)
- self._do_send_request(request, routing_key)
- else:
- routing_keys = \
- [self.routing_table.get_routable_host(request.target)] \
- if request.msg_type in zmq_names.DIRECT_TYPES else \
- self.routing_table.get_all_hosts(request.target)
- for routing_key in routing_keys:
- self._do_send_request(request, routing_key)
-
- def _do_send_request(self, request, routing_key):
- self.socket.send(b'', zmq.SNDMORE)
- self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
- self.socket.send(six.b(routing_key), zmq.SNDMORE)
- self.socket.send(six.b(request.message_id), zmq.SNDMORE)
- self.socket.send_pyobj(request.context, zmq.SNDMORE)
- self.socket.send_pyobj(request.message)
-
- LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
- "a target %(target)s",
- {"message": request.message_id,
- "target": request.target,
- "addr": list(self.socket.connections)})
+ zmq_names.message_type_str(request.msg_type)
+ )
+ for routing_key in self._get_routing_keys(request):
+ request.routing_key = routing_key
+ self.sender.send(self.socket, request)
def cleanup(self):
+ self.connection_updater.stop()
self.socket.close()
+ super(DealerPublisherProxy, self).cleanup()
-class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher):
+class DealerCallPublisherProxy(zmq_dealer_publisher.DealerCallPublisher):
+ """CALL publisher via proxy."""
- def __init__(self, conf, matchmaker, sockets_manager):
- reply_waiter = ReplyWaiterProxy(conf)
- sender = CallSenderProxy(conf, matchmaker, sockets_manager,
- reply_waiter)
+ def __init__(self, sockets_manager, sender, reply_waiter):
super(DealerCallPublisherProxy, self).__init__(
- conf, matchmaker, sockets_manager, sender, reply_waiter)
+ sockets_manager, sender, reply_waiter
+ )
+ self.socket = self.sockets_manager.get_socket_to_publishers()
+ self.routing_table = zmq_routing_table.RoutingTable(self.conf,
+ self.matchmaker)
+ self.connection_updater = \
+ PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
-
-class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
-
- def __init__(self, conf, matchmaker, sockets_manager, reply_waiter):
- super(CallSenderProxy, self).__init__(
- sockets_manager, reply_waiter)
- self.socket = self.outbound_sockets.get_socket_to_publishers()
- self.reply_waiter.poll_socket(self.socket)
- self.routing_table = RoutingTable(conf, matchmaker)
- self.connection_updater = PublisherConnectionUpdater(
- conf, matchmaker, self.socket)
+ def send_request(self, request):
+ try:
+ request.routing_key = \
+ self.routing_table.get_routable_host(request.target)
+ except retrying.RetryError:
+ self._raise_timeout(request)
+ return super(DealerCallPublisherProxy, self).send_request(request)
def _connect_socket(self, target):
return self.socket
- def _do_send_request(self, socket, request):
- routing_key = self.routing_table.get_routable_host(request.target)
-
- # DEALER socket specific envelope empty delimiter
- socket.send(b'', zmq.SNDMORE)
- socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
- socket.send(six.b(routing_key), zmq.SNDMORE)
- socket.send(six.b(request.message_id), zmq.SNDMORE)
- socket.send_pyobj(request.context, zmq.SNDMORE)
- socket.send_pyobj(request.message)
-
- LOG.debug("Sent message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
-
-class ReplyWaiterProxy(zmq_reply_waiter.ReplyWaiter):
-
- def receive_method(self, socket):
- empty = socket.recv()
- assert empty == b'', "Empty expected!"
- reply_id = socket.recv()
- assert reply_id is not None, "Reply ID expected!"
- message_type = int(socket.recv())
- assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!"
- message_id = socket.recv()
- reply = socket.recv_pyobj()
- LOG.debug("Received reply %s", message_id)
- return reply
-
-
-class RoutingTable(object):
- """This class implements local routing-table cache
- taken from matchmaker. Its purpose is to give the next routable
- host id (remote DEALER's id) by request for specific target in
- round-robin fashion.
- """
-
- def __init__(self, conf, matchmaker):
- self.conf = conf
- self.matchmaker = matchmaker
- self.routing_table = {}
- self.routable_hosts = {}
-
- def get_all_hosts(self, target):
- self._update_routing_table(target)
- return list(self.routable_hosts.get(str(target)) or [])
-
- def get_routable_host(self, target):
- self._update_routing_table(target)
- hosts_for_target = self.routable_hosts[str(target)]
- host = hosts_for_target.pop(0)
- if not hosts_for_target:
- self._renew_routable_hosts(target)
- return host
-
- def _is_tm_expired(self, tm):
- return 0 <= self.conf.zmq_target_expire <= time.time() - tm
-
- def _update_routing_table(self, target):
- routing_record = self.routing_table.get(str(target))
- if routing_record is None:
- self._fetch_hosts(target)
- self._renew_routable_hosts(target)
- elif self._is_tm_expired(routing_record[1]):
- self._fetch_hosts(target)
-
- def _fetch_hosts(self, target):
- self.routing_table[str(target)] = (self.matchmaker.get_hosts(
- target, zmq_names.socket_type_str(zmq.DEALER)), time.time())
-
- def _renew_routable_hosts(self, target):
- hosts, _ = self.routing_table[str(target)]
- self.routable_hosts[str(target)] = list(hosts)
+ def cleanup(self):
+ self.connection_updater.stop()
+ super(DealerCallPublisherProxy, self).cleanup()
+ self.socket.close()
class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py
deleted file mode 100644
index bb15fb2..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Copyright 2016 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import threading
-
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._i18n import _LW
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class ReplyWaiter(object):
-
- def __init__(self, conf):
- self.conf = conf
- self.replies = {}
- self.poller = zmq_async.get_poller()
- self.executor = zmq_async.get_executor(self.run_loop)
- self.executor.execute()
- self._lock = threading.Lock()
-
- def track_reply(self, reply_future, message_id):
- with self._lock:
- self.replies[message_id] = reply_future
-
- def untrack_id(self, message_id):
- with self._lock:
- self.replies.pop(message_id)
-
- def poll_socket(self, socket):
- self.poller.register(socket, recv_method=self.receive_method)
-
- def receive_method(self, socket):
- empty = socket.recv()
- assert empty == b'', "Empty expected!"
- reply = socket.recv_pyobj()
- LOG.debug("Received reply %s", reply.message_id)
- return reply
-
- def run_loop(self):
- reply, socket = self.poller.poll(
- timeout=self.conf.rpc_poll_timeout)
- if reply is not None:
- call_future = self.replies.get(reply.message_id)
- if call_future:
- call_future.set_result(reply)
- else:
- LOG.warning(_LW("Received timed out reply: %s"),
- reply.message_id)
-
- def cleanup(self):
- self.poller.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
index bfaff0d..bb5f294 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
@@ -14,14 +14,11 @@
import abc
import logging
-import time
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@@ -56,7 +53,7 @@ class PublisherBase(object):
Publisher can send request objects from zmq_request.
"""
- def __init__(self, sockets_manager):
+ def __init__(self, sockets_manager, sender):
"""Construct publisher
@@ -66,10 +63,10 @@ class PublisherBase(object):
:param conf: configuration object
:type conf: oslo_config.CONF
"""
- self.outbound_sockets = sockets_manager
+ self.sockets_manager = sockets_manager
self.conf = sockets_manager.conf
self.matchmaker = sockets_manager.matchmaker
- super(PublisherBase, self).__init__()
+ self.sender = sender
@abc.abstractmethod
def send_request(self, request):
@@ -79,126 +76,6 @@ class PublisherBase(object):
:type request: zmq_request.Request
"""
- def _send_request(self, socket, request):
- """Send request to consumer.
- Helper private method which defines basic sending behavior.
-
- :param socket: Socket to publish message on
- :type socket: zmq.Socket
- :param request: Message data and destination container object
- :type request: zmq_request.Request
- """
- LOG.debug("Sending %(type)s message_id %(message)s to a target "
- "%(target)s",
- {"type": request.msg_type,
- "message": request.message_id,
- "target": request.target})
- socket.send_pyobj(request)
-
def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
- self.outbound_sockets.cleanup()
-
-
-class SocketsManager(object):
-
- def __init__(self, conf, matchmaker, listener_type, socket_type):
- self.conf = conf
- self.matchmaker = matchmaker
- self.listener_type = listener_type
- self.socket_type = socket_type
- self.zmq_context = zmq.Context()
- self.outbound_sockets = {}
- self.socket_to_publishers = None
- self.socket_to_routers = None
-
- def get_hosts(self, target):
- return self.matchmaker.get_hosts(
- target, zmq_names.socket_type_str(self.listener_type))
-
- @staticmethod
- def _key_from_target(target):
- return target.topic if target.fanout else str(target)
-
- def _get_hosts_and_connect(self, socket, target):
- hosts = self.get_hosts(target)
- self._connect_to_hosts(socket, target, hosts)
-
- def _track_socket(self, socket, target):
- key = self._key_from_target(target)
- self.outbound_sockets[key] = (socket, time.time())
-
- def _connect_to_hosts(self, socket, target, hosts):
- for host in hosts:
- socket.connect_to_host(host)
- self._track_socket(socket, target)
-
- def _check_for_new_hosts(self, target):
- key = self._key_from_target(target)
- socket, tm = self.outbound_sockets[key]
- if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
- self._get_hosts_and_connect(socket, target)
- return socket
-
- def get_socket(self, target):
- key = self._key_from_target(target)
- if key in self.outbound_sockets:
- socket = self._check_for_new_hosts(target)
- else:
- socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
- self.socket_type)
- self._get_hosts_and_connect(socket, target)
- return socket
-
- def get_socket_to_publishers(self):
- if self.socket_to_publishers is not None:
- return self.socket_to_publishers
- self.socket_to_publishers = zmq_socket.ZmqSocket(
- self.conf, self.zmq_context, self.socket_type)
- publishers = self.matchmaker.get_publishers()
- for pub_address, router_address in publishers:
- self.socket_to_publishers.connect_to_host(router_address)
- return self.socket_to_publishers
-
- def get_socket_to_routers(self):
- if self.socket_to_routers is not None:
- return self.socket_to_routers
- self.socket_to_routers = zmq_socket.ZmqSocket(
- self.conf, self.zmq_context, self.socket_type)
- routers = self.matchmaker.get_routers()
- for router_address in routers:
- self.socket_to_routers.connect_to_host(router_address)
- return self.socket_to_routers
-
- def cleanup(self):
- for socket, tm in self.outbound_sockets.values():
- socket.close()
-
-
-class QueuedSender(PublisherBase):
-
- def __init__(self, sockets_manager, _do_send_request):
- super(QueuedSender, self).__init__(sockets_manager)
- self._do_send_request = _do_send_request
- self.queue, self.empty_except = zmq_async.get_queue()
- self.executor = zmq_async.get_executor(self.run_loop)
- self.executor.execute()
-
- def send_request(self, request):
- self.queue.put(request)
-
- def _connect_socket(self, target):
- return self.outbound_sockets.get_socket(target)
-
- def run_loop(self):
- try:
- request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
- except self.empty_except:
- return
-
- socket = self._connect_socket(request.target)
- self._do_send_request(socket, request)
-
- def cleanup(self):
- self.executor.stop()
- super(QueuedSender, self).cleanup()
+ self.sockets_manager.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py
deleted file mode 100644
index 4960979..0000000
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from oslo_messaging._drivers.zmq_driver.client.publishers\
- import zmq_publisher_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class PushPublisher(object):
-
- def __init__(self, conf, matchmaker):
- super(PushPublisher, self).__init__()
- sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.PULL, zmq.PUSH)
-
- def _do_send_request(push_socket, request):
- push_socket.send_pyobj(request)
-
- LOG.debug("Sending message_id %(message)s to a target %(target)s",
- {"message": request.message_id,
- "target": request.target})
-
- self.sender = zmq_publisher_base.QueuedSender(
- sockets_manager, _do_send_request)
-
- def send_request(self, request):
-
- if request.msg_type != zmq_names.CAST_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
-
- self.sender.send_request(request)
-
- def cleanup(self):
- self.sender.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
index e5951cb..a8cfe93 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
@@ -15,14 +15,13 @@
from oslo_messaging._drivers import common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_dealer_call_publisher
-from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
+from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -46,25 +45,34 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
if conf.use_router_proxy or not conf.use_pub_sub:
raise WrongClientException()
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
+ self.sockets_manager = zmq_sockets_manager.SocketsManager(
+ conf, matchmaker, zmq.ROUTER, zmq.DEALER
+ )
+
+ sender_proxy = zmq_senders.RequestSenderProxy(conf)
+ sender_direct = zmq_senders.RequestSenderDirect(conf)
+
+ receiver_direct = zmq_receivers.ReplyReceiverDirect(conf)
fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
- conf, matchmaker, self.sockets_manager.get_socket_to_publishers())
+ self.sockets_manager, sender_proxy
+ )
super(ZmqClientMixDirectPubSub, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
- zmq_dealer_call_publisher.DealerCallPublisher(
- conf, matchmaker, self.sockets_manager),
+ zmq_dealer_publisher.DealerCallPublisher(
+ self.sockets_manager, sender_direct, receiver_direct
+ ),
zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
zmq_names.NOTIFY_TYPE: fanout_publisher,
- "default": zmq_dealer_publisher.DealerPublisherAsync(
- conf, matchmaker)
+ "default":
+ zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
+ sender_direct)
}
)
@@ -82,18 +90,25 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
if conf.use_pub_sub or conf.use_router_proxy:
raise WrongClientException()
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
+ self.sockets_manager = zmq_sockets_manager.SocketsManager(
+ conf, matchmaker, zmq.ROUTER, zmq.DEALER
+ )
+
+ sender = zmq_senders.RequestSenderDirect(conf)
+
+ receiver = zmq_receivers.ReplyReceiverDirect(conf)
super(ZmqClientDirect, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
- zmq_dealer_call_publisher.DealerCallPublisher(
- conf, matchmaker, self.sockets_manager),
+ zmq_dealer_publisher.DealerCallPublisher(
+ self.sockets_manager, sender, receiver
+ ),
- "default": zmq_dealer_publisher.DealerPublisher(
- conf, matchmaker)
+ "default":
+ zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
+ sender)
}
)
@@ -113,18 +128,25 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
if not conf.use_router_proxy:
raise WrongClientException()
- self.sockets_manager = zmq_publisher_base.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER)
+ self.sockets_manager = zmq_sockets_manager.SocketsManager(
+ conf, matchmaker, zmq.ROUTER, zmq.DEALER
+ )
+
+ sender = zmq_senders.RequestSenderProxy(conf)
+
+ receiver = zmq_receivers.ReplyReceiverProxy(conf)
super(ZmqClientProxy, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
- conf, matchmaker, self.sockets_manager),
+ self.sockets_manager, sender, receiver
+ ),
- "default": zmq_dealer_publisher_proxy.DealerPublisherProxy(
- conf, matchmaker,
- self.sockets_manager.get_socket_to_publishers())
+ "default":
+ zmq_dealer_publisher_proxy.DealerPublisherProxy(
+ self.sockets_manager, sender
+ )
}
)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
new file mode 100644
index 0000000..e8be2a3
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py
@@ -0,0 +1,140 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+import threading
+
+import futurist
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class ReceiverBase(object):
+ """Base response receiving interface."""
+
+ def __init__(self, conf):
+ self.conf = conf
+ self._lock = threading.Lock()
+ self._requests = {}
+ self._poller = zmq_async.get_poller()
+ self._executor = zmq_async.get_executor(method=self._run_loop)
+ self._executor.execute()
+
+ @abc.abstractproperty
+ def message_types(self):
+ """A list of supported incoming response types."""
+
+ def register_socket(self, socket):
+ """Register a socket for receiving data."""
+ self._poller.register(socket, recv_method=self.recv_response)
+
+ @abc.abstractmethod
+ def recv_response(self, socket):
+ """Receive a response and return a tuple of the form
+ (reply_id, message_type, message_id, response).
+ """
+
+ def track_request(self, request):
+ """Track a request via already registered sockets and return
+ a list of futures for monitoring all types of responses.
+ """
+ futures = []
+ for message_type in self.message_types:
+ future = futurist.Future()
+ self._set_future(request.message_id, message_type, future)
+ futures.append(future)
+ return futures
+
+ def untrack_request(self, request):
+ """Untrack a request and stop monitoring any responses."""
+ for message_type in self.message_types:
+ self._pop_future(request.message_id, message_type)
+
+ def stop(self):
+ self._poller.close()
+ self._executor.stop()
+
+ def _get_future(self, message_id, message_type):
+ with self._lock:
+ return self._requests.get((message_id, message_type))
+
+ def _set_future(self, message_id, message_type, future):
+ with self._lock:
+ self._requests[(message_id, message_type)] = future
+
+ def _pop_future(self, message_id, message_type):
+ with self._lock:
+ return self._requests.pop((message_id, message_type), None)
+
+ def _run_loop(self):
+ data, socket = self._poller.poll(timeout=self.conf.rpc_poll_timeout)
+ if data is None:
+ return
+ reply_id, message_type, message_id, response = data
+ assert message_type in self.message_types, \
+ "%s is not supported!" % zmq_names.message_type_str(message_type)
+ future = self._get_future(message_id, message_type)
+ if future is not None:
+ LOG.debug("Received %(msg_type)s for %(msg_id)s",
+ {"msg_type": zmq_names.message_type_str(message_type),
+ "msg_id": message_id})
+ future.set_result((reply_id, response))
+
+
+class AckReceiver(ReceiverBase):
+
+ message_types = (zmq_names.ACK_TYPE,)
+
+
+class ReplyReceiver(ReceiverBase):
+
+ message_types = (zmq_names.REPLY_TYPE,)
+
+
+class ReplyReceiverProxy(ReplyReceiver):
+
+ def recv_response(self, socket):
+ empty = socket.recv()
+ assert empty == b'', "Empty expected!"
+ reply_id = socket.recv()
+ assert reply_id is not None, "Reply ID expected!"
+ message_type = int(socket.recv())
+ assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!"
+ message_id = socket.recv()
+ reply = socket.recv_pyobj()
+ LOG.debug("Received reply for %s", message_id)
+ return reply_id, message_type, message_id, reply
+
+
+class ReplyReceiverDirect(ReplyReceiver):
+
+ def recv_response(self, socket):
+ empty = socket.recv()
+ assert empty == b'', "Empty expected!"
+ reply = socket.recv_pyobj()
+ LOG.debug("Received reply for %s", reply.message_id)
+ return reply.reply_id, reply.type_, reply.message_id, reply
+
+
+class AckAndReplyReceiver(ReceiverBase):
+
+ message_types = (zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
new file mode 100644
index 0000000..2abb21b
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py
@@ -0,0 +1,65 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+zmq = zmq_async.import_zmq()
+
+
+class RoutingTable(object):
+ """This class implements local routing-table cache
+ taken from matchmaker. Its purpose is to give the next routable
+ host id (remote DEALER's id) by request for specific target in
+ round-robin fashion.
+ """
+
+ def __init__(self, conf, matchmaker):
+ self.conf = conf
+ self.matchmaker = matchmaker
+ self.routing_table = {}
+ self.routable_hosts = {}
+
+ def get_all_hosts(self, target):
+ self._update_routing_table(target)
+ return list(self.routable_hosts.get(str(target)) or [])
+
+ def get_routable_host(self, target):
+ self._update_routing_table(target)
+ hosts_for_target = self.routable_hosts[str(target)]
+ host = hosts_for_target.pop(0)
+ if not hosts_for_target:
+ self._renew_routable_hosts(target)
+ return host
+
+ def _is_tm_expired(self, tm):
+ return 0 <= self.conf.zmq_target_expire <= time.time() - tm
+
+ def _update_routing_table(self, target):
+ routing_record = self.routing_table.get(str(target))
+ if routing_record is None:
+ self._fetch_hosts(target)
+ self._renew_routable_hosts(target)
+ elif self._is_tm_expired(routing_record[1]):
+ self._fetch_hosts(target)
+
+ def _fetch_hosts(self, target):
+ self.routing_table[str(target)] = (self.matchmaker.get_hosts(
+ target, zmq_names.socket_type_str(zmq.DEALER)), time.time())
+
+ def _renew_routable_hosts(self, target):
+ hosts, _ = self.routing_table[str(target)]
+ self.routable_hosts[str(target)] = list(hosts)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
new file mode 100644
index 0000000..f7cde0d
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
@@ -0,0 +1,94 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import logging
+
+import six
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+@six.add_metaclass(abc.ABCMeta)
+class SenderBase(object):
+ """Base request/ack/reply sending interface."""
+
+ def __init__(self, conf):
+ self.conf = conf
+
+ @abc.abstractmethod
+ def send(self, socket, message):
+ pass
+
+
+class RequestSenderProxy(SenderBase):
+
+ def send(self, socket, request):
+ socket.send(b'', zmq.SNDMORE)
+ socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
+ socket.send(six.b(request.routing_key), zmq.SNDMORE)
+ socket.send(six.b(request.message_id), zmq.SNDMORE)
+ socket.send_pyobj(request.context, zmq.SNDMORE)
+ socket.send_pyobj(request.message)
+
+ LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message "
+ "%(msg_id)s to target %(target)s",
+ {"addr": list(socket.connections),
+ "msg_type": zmq_names.message_type_str(request.msg_type),
+ "msg_id": request.message_id,
+ "target": request.target})
+
+
+class ReplySenderProxy(SenderBase):
+
+ def send(self, socket, reply):
+ LOG.debug("Replying to %s", reply.message_id)
+
+ assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!"
+
+ socket.send(b'', zmq.SNDMORE)
+ socket.send(six.b(str(reply.type_)), zmq.SNDMORE)
+ socket.send(reply.reply_id, zmq.SNDMORE)
+ socket.send(reply.message_id, zmq.SNDMORE)
+ socket.send_pyobj(reply)
+
+
+class RequestSenderDirect(SenderBase):
+
+ def send(self, socket, request):
+ socket.send(b'', zmq.SNDMORE)
+ socket.send_pyobj(request)
+
+ LOG.debug("Sending %(msg_type)s message %(msg_id)s to "
+ "target %(target)s",
+ {"msg_type": zmq_names.message_type_str(request.msg_type),
+ "msg_id": request.message_id,
+ "target": request.target})
+
+
+class ReplySenderDirect(SenderBase):
+
+ def send(self, socket, reply):
+ LOG.debug("Replying to %s", reply.message_id)
+
+ assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!"
+
+ socket.send(reply.reply_id, zmq.SNDMORE)
+ socket.send(b'', zmq.SNDMORE)
+ socket.send_pyobj(reply)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
new file mode 100644
index 0000000..27e5f6e
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py
@@ -0,0 +1,96 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import time
+
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+from oslo_messaging._drivers.zmq_driver import zmq_socket
+
+zmq = zmq_async.import_zmq()
+
+
+class SocketsManager(object):
+
+ def __init__(self, conf, matchmaker, listener_type, socket_type):
+ self.conf = conf
+ self.matchmaker = matchmaker
+ self.listener_type = listener_type
+ self.socket_type = socket_type
+ self.zmq_context = zmq.Context()
+ self.outbound_sockets = {}
+ self.socket_to_publishers = None
+ self.socket_to_routers = None
+
+ def get_hosts(self, target):
+ return self.matchmaker.get_hosts(
+ target, zmq_names.socket_type_str(self.listener_type))
+
+ @staticmethod
+ def _key_from_target(target):
+ return target.topic if target.fanout else str(target)
+
+ def _get_hosts_and_connect(self, socket, target):
+ hosts = self.get_hosts(target)
+ self._connect_to_hosts(socket, target, hosts)
+
+ def _track_socket(self, socket, target):
+ key = self._key_from_target(target)
+ self.outbound_sockets[key] = (socket, time.time())
+
+ def _connect_to_hosts(self, socket, target, hosts):
+ for host in hosts:
+ socket.connect_to_host(host)
+ self._track_socket(socket, target)
+
+ def _check_for_new_hosts(self, target):
+ key = self._key_from_target(target)
+ socket, tm = self.outbound_sockets[key]
+ if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
+ self._get_hosts_and_connect(socket, target)
+ return socket
+
+ def get_socket(self, target):
+ key = self._key_from_target(target)
+ if key in self.outbound_sockets:
+ socket = self._check_for_new_hosts(target)
+ else:
+ socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
+ self.socket_type)
+ self._get_hosts_and_connect(socket, target)
+ return socket
+
+ def get_socket_to_publishers(self):
+ if self.socket_to_publishers is not None:
+ return self.socket_to_publishers
+ self.socket_to_publishers = zmq_socket.ZmqSocket(
+ self.conf, self.zmq_context, self.socket_type)
+ publishers = self.matchmaker.get_publishers()
+ for pub_address, router_address in publishers:
+ self.socket_to_publishers.connect_to_host(router_address)
+ return self.socket_to_publishers
+
+ def get_socket_to_routers(self):
+ if self.socket_to_routers is not None:
+ return self.socket_to_routers
+ self.socket_to_routers = zmq_socket.ZmqSocket(
+ self.conf, self.zmq_context, self.socket_type)
+ routers = self.matchmaker.get_routers()
+ for router_address in routers:
+ self.socket_to_routers.connect_to_host(router_address)
+ return self.socket_to_routers
+
+ def cleanup(self):
+ for socket, tm in self.outbound_sockets.values():
+ socket.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
index c7792df..cfde3ad 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py
@@ -14,15 +14,12 @@
import logging
-import six
-
-from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers.zmq_driver.client.publishers\
- import zmq_publisher_base
-from oslo_messaging._drivers.zmq_driver.client import zmq_response
-from oslo_messaging._drivers.zmq_driver.server.consumers\
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
+from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
+from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
+from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
@@ -33,54 +30,11 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class DealerIncomingMessage(base.RpcIncomingMessage):
-
- def __init__(self, context, message):
- super(DealerIncomingMessage, self).__init__(context, message)
-
- def reply(self, reply=None, failure=None):
- """Reply is not needed for non-call messages"""
-
- def acknowledge(self):
- """Not sending acknowledge"""
-
- def requeue(self):
- """Requeue is not supported"""
-
-
-class DealerIncomingRequest(base.RpcIncomingMessage):
-
- def __init__(self, socket, reply_id, message_id, context, message):
- super(DealerIncomingRequest, self).__init__(context, message)
- self.reply_socket = socket
- self.reply_id = reply_id
- self.message_id = message_id
-
- def reply(self, reply=None, failure=None):
- if failure is not None:
- failure = rpc_common.serialize_remote_exception(failure)
- response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
- message_id=self.message_id,
- reply_id=self.reply_id,
- reply_body=reply,
- failure=failure)
-
- LOG.debug("Replying %s", self.message_id)
-
- self.reply_socket.send(b'', zmq.SNDMORE)
- self.reply_socket.send(six.b(str(zmq_names.REPLY_TYPE)), zmq.SNDMORE)
- self.reply_socket.send(self.reply_id, zmq.SNDMORE)
- self.reply_socket.send(self.message_id, zmq.SNDMORE)
- self.reply_socket.send_pyobj(response)
-
- def requeue(self):
- """Requeue is not supported"""
-
-
class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
- self.sockets_manager = zmq_publisher_base.SocketsManager(
+ self.sender = zmq_senders.ReplySenderProxy(conf)
+ self.sockets_manager = zmq_sockets_manager.SocketsManager(
conf, server.matchmaker, zmq.ROUTER, zmq.DEALER)
self.host = None
super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER)
@@ -91,6 +45,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
def subscribe_socket(self, socket_type):
try:
socket = self.sockets_manager.get_socket_to_routers()
+ self.sockets.append(socket)
self.host = socket.handle.identity
self.poller.register(socket, self.receive_message)
return socket
@@ -110,10 +65,12 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
LOG.debug("[%(host)s] Received message %(id)s",
{"host": self.host, "id": message_id})
if message_type == zmq_names.CALL_TYPE:
- return DealerIncomingRequest(
- socket, reply_id, message_id, context, message)
+ return zmq_incoming_message.ZmqIncomingMessage(
+ context, message, reply_id, message_id, socket, self.sender
+ )
elif message_type in zmq_names.NON_BLOCKING_TYPES:
- return DealerIncomingMessage(context, message)
+ return zmq_incoming_message.ZmqIncomingMessage(context,
+ message)
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(message_type))
@@ -122,6 +79,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
def cleanup(self):
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
+ self.connection_updater.cleanup()
super(DealerConsumer, self).cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py
deleted file mode 100644
index 719c24e..0000000
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Copyright 2015 Mirantis, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.zmq_driver.server.consumers\
- import zmq_consumer_base
-from oslo_messaging._drivers.zmq_driver import zmq_async
-from oslo_messaging._drivers.zmq_driver import zmq_names
-from oslo_messaging._i18n import _LE, _LI
-
-LOG = logging.getLogger(__name__)
-
-zmq = zmq_async.import_zmq()
-
-
-class PullIncomingMessage(base.RpcIncomingMessage):
-
- def __init__(self, context, message):
- super(PullIncomingMessage, self).__init__(context, message)
-
- def reply(self, reply=None, failure=None):
- """Reply is not needed for non-call messages."""
-
- def acknowledge(self):
- """Acknowledgments are not supported by this type of consumer."""
-
- def requeue(self):
- """Requeueing is not supported."""
-
-
-class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
-
- def __init__(self, conf, poller, server):
- super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL)
- LOG.info(_LI("[%s] Run PULL consumer"), self.host)
-
- def receive_message(self, socket):
- try:
- request = socket.recv_pyobj()
- msg_type = request.msg_type
- assert msg_type is not None, 'Bad format: msg type expected'
- context = request.context
- message = request.message
- LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
- {"host": self.host,
- "type": request.msg_type,
- "id": request.message_id,
- "target": request.target})
-
- if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
- return PullIncomingMessage(context, message)
- else:
- LOG.error(_LE("Unknown message type: %s"), msg_type)
-
- except (zmq.ZMQError, AssertionError) as e:
- LOG.error(_LE("Receiving message failed: %s"), str(e))
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
index 0e40d5c..64cbcfd 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py
@@ -14,7 +14,7 @@
import logging
-from oslo_messaging._drivers import base
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
@@ -27,29 +27,10 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class RouterIncomingMessage(base.RpcIncomingMessage):
-
- def __init__(self, context, message, socket, reply_id, msg_id,
- poller):
- super(RouterIncomingMessage, self).__init__(context, message)
- self.socket = socket
- self.reply_id = reply_id
- self.msg_id = msg_id
- self.message = message
-
- def reply(self, reply=None, failure=None):
- """Reply is not needed for non-call messages"""
-
- def acknowledge(self):
- LOG.debug("Not sending acknowledge for %s", self.msg_id)
-
- def requeue(self):
- """Requeue is not supported"""
-
-
class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
def __init__(self, conf, poller, server):
+ self.sender = zmq_senders.ReplySenderDirect(conf)
super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER)
LOG.info(_LI("[%s] Run ROUTER consumer"), self.host)
@@ -70,14 +51,19 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
"target": request.target})
if request.msg_type == zmq_names.CALL_TYPE:
- return zmq_incoming_message.ZmqIncomingRequest(
- socket, reply_id, request, self.poller)
+ return zmq_incoming_message.ZmqIncomingMessage(
+ request.context, request.message, reply_id,
+ request.message_id, socket, self.sender
+ )
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
- return RouterIncomingMessage(
- request.context, request.message, socket, reply_id,
- request.message_id, self.poller)
+ return zmq_incoming_message.ZmqIncomingMessage(request.context,
+ request.message)
else:
- LOG.error(_LE("Unknown message type: %s"), request.msg_type)
-
+ LOG.error(_LE("Unknown message type: %s"),
+ zmq_names.message_type_str(request.msg_type))
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
+
+ def cleanup(self):
+ LOG.info(_LI("[%s] Destroy ROUTER consumer"), self.host)
+ super(RouterConsumer, self).cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
index 6aa8ec4..a6e32aa 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py
@@ -16,9 +16,9 @@ import logging
import six
-from oslo_messaging._drivers import base
-from oslo_messaging._drivers.zmq_driver.server.consumers\
+from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
+from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
@@ -29,21 +29,6 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class SubIncomingMessage(base.RpcIncomingMessage):
-
- def __init__(self, context, message):
- super(SubIncomingMessage, self).__init__(context, message)
-
- def reply(self, reply=None, failure=None):
- """Reply is not needed for non-call messages."""
-
- def acknowledge(self):
- """Requeue is not supported"""
-
- def requeue(self):
- """Requeue is not supported"""
-
-
class SubConsumer(zmq_consumer_base.ConsumerBase):
def __init__(self, conf, poller, server):
@@ -89,8 +74,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
context, message = self._receive_request(socket)
if not message:
return None
-
- return SubIncomingMessage(context, message)
+ return zmq_incoming_message.ZmqIncomingMessage(context, message)
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
index 51c83e2..2c76227 100644
--- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
+++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py
@@ -21,38 +21,41 @@ from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
-
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class ZmqIncomingRequest(base.RpcIncomingMessage):
+class ZmqIncomingMessage(base.RpcIncomingMessage):
+
+ def __init__(self, context, message, reply_id=None, message_id=None,
+ socket=None, sender=None):
+
+ if sender is not None:
+ assert socket is not None, "Valid socket expected!"
+ assert message_id is not None, "Valid message ID expected!"
+ assert reply_id is not None, "Valid reply ID expected!"
+
+ super(ZmqIncomingMessage, self).__init__(context, message)
+
+ self.reply_id = reply_id
+ self.message_id = message_id
+ self.socket = socket
+ self.sender = sender
- def __init__(self, socket, rep_id, request, poller):
- super(ZmqIncomingRequest, self).__init__(request.context,
- request.message)
- self.reply_socket = socket
- self.reply_id = rep_id
- self.request = request
- self.received = None
- self.poller = poller
+ def acknowledge(self):
+ """Not sending acknowledge"""
def reply(self, reply=None, failure=None):
- if failure is not None:
- failure = rpc_common.serialize_remote_exception(failure)
- response = zmq_response.Response(type=zmq_names.REPLY_TYPE,
- message_id=self.request.message_id,
- reply_id=self.reply_id,
- reply_body=reply,
- failure=failure)
-
- LOG.debug("Replying %s", (str(self.request.message_id)))
-
- self.received = True
- self.reply_socket.send(self.reply_id, zmq.SNDMORE)
- self.reply_socket.send(b'', zmq.SNDMORE)
- self.reply_socket.send_pyobj(response)
+ if self.sender is not None:
+ if failure is not None:
+ failure = rpc_common.serialize_remote_exception(failure)
+ reply = zmq_response.Response(type=zmq_names.REPLY_TYPE,
+ message_id=self.message_id,
+ reply_id=self.reply_id,
+ reply_body=reply,
+ failure=failure)
+ self.sender.send(self.socket, reply)
def requeue(self):
"""Requeue is not supported"""
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
index 51f68c6..8b63e0e 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py
@@ -69,8 +69,8 @@ def socket_type_str(socket_type):
def message_type_str(message_type):
msg_type_str = {CALL_TYPE: "CALL",
CAST_TYPE: "CAST",
- CAST_FANOUT_TYPE: "CAST_FANOUT_TYPE",
- NOTIFY_TYPE: "NOTIFY_TYPE",
- REPLY_TYPE: "REPLY_TYPE",
- ACK_TYPE: "ACK_TYPE"}
+ CAST_FANOUT_TYPE: "CAST_FANOUT",
+ NOTIFY_TYPE: "NOTIFY",
+ REPLY_TYPE: "REPLY",
+ ACK_TYPE: "ACK"}
return msg_type_str[message_type]
diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
index a8ea822..302915d 100644
--- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
+++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py
@@ -31,6 +31,8 @@ class UpdaterBase(object):
self.conf = conf
self.matchmaker = matchmaker
self.update_method = update_method
+ # make first update immediately
+ self.update_method()
self.executor = zmq_async.get_executor(method=self._update_loop)
self.executor.execute()
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index e489522..0dcc047 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -300,6 +300,8 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
if zmq_redis_port:
self.config(port=zmq_redis_port, group="matchmaker_redis")
+ self.config(check_timeout=10000, group="matchmaker_redis")
+ self.config(wait_timeout=1000, group="matchmaker_redis")
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
if zmq_use_pub_sub:
self.config(use_pub_sub=zmq_use_pub_sub)