summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py (renamed from oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py)93
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py53
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py66
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py33
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_client.py76
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py25
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py16
7 files changed, 184 insertions, 178 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
index 89031ec..4a5eba4 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
@@ -1,4 +1,4 @@
-# Copyright 2015 Mirantis, Inc.
+# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import abc
from concurrent import futures
import logging
@@ -21,6 +22,7 @@ import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE
@@ -30,34 +32,23 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class DealerPublisher(zmq_publisher_base.PublisherBase):
- """Non-CALL publisher using direct connections."""
+class DealerPublisherBase(zmq_publisher_base.PublisherBase):
+ """Abstract DEALER-publisher."""
- def send_request(self, request):
- if request.msg_type == zmq_names.CALL_TYPE:
+ def __init__(self, conf, matchmaker, sender, receiver):
+ sockets_manager = zmq_sockets_manager.SocketsManager(
+ conf, matchmaker, zmq.ROUTER, zmq.DEALER
+ )
+ super(DealerPublisherBase, self).__init__(sockets_manager, sender,
+ receiver)
+
+ @staticmethod
+ def _check_pattern(request, supported_pattern):
+ if request.msg_type != supported_pattern:
raise zmq_publisher_base.UnsupportedSendPattern(
zmq_names.message_type_str(request.msg_type)
)
- try:
- socket = self.sockets_manager.get_socket(request.target)
- except retrying.RetryError:
- return
-
- if request.msg_type in zmq_names.MULTISEND_TYPES:
- for _ in range(socket.connections_count()):
- self.sender.send(socket, request)
- else:
- self.sender.send(socket, request)
-
-
-class DealerCallPublisher(zmq_publisher_base.PublisherBase):
- """CALL publisher using direct connections."""
-
- def __init__(self, sockets_manager, sender, reply_receiver):
- super(DealerCallPublisher, self).__init__(sockets_manager, sender)
- self.reply_receiver = reply_receiver
-
@staticmethod
def _raise_timeout(request):
raise oslo_messaging.MessagingTimeout(
@@ -65,26 +56,12 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
{"tout": request.timeout, "msg_id": request.message_id}
)
- def send_request(self, request):
- if request.msg_type != zmq_names.CALL_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(
- zmq_names.message_type_str(request.msg_type)
- )
-
- try:
- socket = self._connect_socket(request.target)
- except retrying.RetryError:
- self._raise_timeout(request)
-
- self.sender.send(socket, request)
- self.reply_receiver.register_socket(socket)
- return self._recv_reply(request)
-
- def _connect_socket(self, target):
- return self.sockets_manager.get_socket(target)
+ @abc.abstractmethod
+ def _connect_socket(self, request):
+ pass
def _recv_reply(self, request):
- reply_future, = self.reply_receiver.track_request(request)
+ reply_future, = self.receiver.track_request(request)
try:
_, reply = reply_future.result(timeout=request.timeout)
@@ -95,7 +72,7 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
except futures.TimeoutError:
self._raise_timeout(request)
finally:
- self.reply_receiver.untrack_request(request)
+ self.receiver.untrack_request(request)
if reply.failure:
raise rpc_common.deserialize_remote_exception(
@@ -104,6 +81,30 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
else:
return reply.reply_body
- def cleanup(self):
- self.reply_receiver.stop()
- super(DealerCallPublisher, self).cleanup()
+ def send_call(self, request):
+ self._check_pattern(request, zmq_names.CALL_TYPE)
+
+ try:
+ socket = self._connect_socket(request)
+ except retrying.RetryError:
+ self._raise_timeout(request)
+
+ self.sender.send(socket, request)
+ self.receiver.register_socket(socket)
+ return self._recv_reply(request)
+
+ @abc.abstractmethod
+ def _send_non_blocking(self, request):
+ pass
+
+ def send_cast(self, request):
+ self._check_pattern(request, zmq_names.CAST_TYPE)
+ self._send_non_blocking(request)
+
+ def send_fanout(self, request):
+ self._check_pattern(request, zmq_names.CAST_FANOUT_TYPE)
+ self._send_non_blocking(request)
+
+ def send_notify(self, request):
+ self._check_pattern(request, zmq_names.NOTIFY_TYPE)
+ self._send_non_blocking(request)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
new file mode 100644
index 0000000..56d8b49
--- /dev/null
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py
@@ -0,0 +1,53 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+import retrying
+
+from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
+ import zmq_dealer_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
+from oslo_messaging._drivers.zmq_driver import zmq_async
+from oslo_messaging._drivers.zmq_driver import zmq_names
+
+LOG = logging.getLogger(__name__)
+
+zmq = zmq_async.import_zmq()
+
+
+class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
+ """DEALER-publisher using direct connections."""
+
+ def __init__(self, conf, matchmaker):
+ sender = zmq_senders.RequestSenderDirect(conf)
+ receiver = zmq_receivers.ReplyReceiverDirect(conf)
+ super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender,
+ receiver)
+
+ def _connect_socket(self, request):
+ return self.sockets_manager.get_socket(request.target)
+
+ def _send_non_blocking(self, request):
+ try:
+ socket = self._connect_socket(request)
+ except retrying.RetryError:
+ return
+
+ if request.msg_type in zmq_names.MULTISEND_TYPES:
+ for _ in range(socket.connections_count()):
+ self.sender.send(socket, request)
+ else:
+ self.sender.send(socket, request)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
index 9f53bed..29dd3fc 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py
@@ -17,10 +17,10 @@ import logging
import retrying
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_dealer_publisher
-from oslo_messaging._drivers.zmq_driver.client.publishers \
- import zmq_publisher_base
+ import zmq_dealer_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
+from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -31,17 +31,31 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
-class DealerPublisherProxy(zmq_publisher_base.PublisherBase):
- """Non-CALL publisher via proxy."""
+class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
+ """DEALER-publisher via proxy."""
- def __init__(self, sockets_manager, sender):
- super(DealerPublisherProxy, self).__init__(sockets_manager, sender)
- self.socket = sockets_manager.get_socket_to_publishers()
+ def __init__(self, conf, matchmaker):
+ sender = zmq_senders.RequestSenderProxy(conf)
+ receiver = zmq_receivers.ReplyReceiverProxy(conf)
+ super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender,
+ receiver)
+ self.socket = self.sockets_manager.get_socket_to_publishers()
self.routing_table = zmq_routing_table.RoutingTable(self.conf,
self.matchmaker)
self.connection_updater = \
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
+ def _connect_socket(self, request):
+ return self.socket
+
+ def send_call(self, request):
+ try:
+ request.routing_key = \
+ self.routing_table.get_routable_host(request.target)
+ except retrying.RetryError:
+ self._raise_timeout(request)
+ return super(DealerPublisherProxy, self).send_call(request)
+
def _get_routing_keys(self, request):
try:
if request.msg_type in zmq_names.DIRECT_TYPES:
@@ -54,48 +68,14 @@ class DealerPublisherProxy(zmq_publisher_base.PublisherBase):
except retrying.RetryError:
return []
- def send_request(self, request):
- if request.msg_type == zmq_names.CALL_TYPE:
- raise zmq_publisher_base.UnsupportedSendPattern(
- zmq_names.message_type_str(request.msg_type)
- )
+ def _send_non_blocking(self, request):
for routing_key in self._get_routing_keys(request):
request.routing_key = routing_key
self.sender.send(self.socket, request)
def cleanup(self):
- self.connection_updater.stop()
- self.socket.close()
super(DealerPublisherProxy, self).cleanup()
-
-
-class DealerCallPublisherProxy(zmq_dealer_publisher.DealerCallPublisher):
- """CALL publisher via proxy."""
-
- def __init__(self, sockets_manager, sender, reply_waiter):
- super(DealerCallPublisherProxy, self).__init__(
- sockets_manager, sender, reply_waiter
- )
- self.socket = self.sockets_manager.get_socket_to_publishers()
- self.routing_table = zmq_routing_table.RoutingTable(self.conf,
- self.matchmaker)
- self.connection_updater = \
- PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
-
- def send_request(self, request):
- try:
- request.routing_key = \
- self.routing_table.get_routable_host(request.target)
- except retrying.RetryError:
- self._raise_timeout(request)
- return super(DealerCallPublisherProxy, self).send_request(request)
-
- def _connect_socket(self, target):
- return self.socket
-
- def cleanup(self):
self.connection_updater.stop()
- super(DealerCallPublisherProxy, self).cleanup()
self.socket.close()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
index bb5f294..9da0c05 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py
@@ -53,29 +53,42 @@ class PublisherBase(object):
Publisher can send request objects from zmq_request.
"""
- def __init__(self, sockets_manager, sender):
+ def __init__(self, sockets_manager, sender, receiver):
"""Construct publisher
- Accept configuration object and Name Service interface object.
- Create zmq.Context and connected sockets dictionary.
+ Accept sockets manager, sender and receiver objects.
- :param conf: configuration object
- :type conf: oslo_config.CONF
+ :param sockets_manager: sockets manager object
+ :type sockets_manager: zmq_sockets_manager.SocketsManager
+ :param senders: request sender object
+ :type senders: zmq_senders.RequestSender
+ :param receiver: reply receiver object
+ :type receiver: zmq_receivers.ReplyReceiver
"""
self.sockets_manager = sockets_manager
self.conf = sockets_manager.conf
self.matchmaker = sockets_manager.matchmaker
self.sender = sender
+ self.receiver = receiver
@abc.abstractmethod
- def send_request(self, request):
- """Send request to consumer
+ def send_call(self, request):
+ pass
- :param request: Message data and destination container object
- :type request: zmq_request.Request
- """
+ @abc.abstractmethod
+ def send_cast(self, request):
+ pass
+
+ @abc.abstractmethod
+ def send_fanout(self, request):
+ pass
+
+ @abc.abstractmethod
+ def send_notify(self, request):
+ pass
def cleanup(self):
"""Cleanup publisher. Close allocated connections."""
+ self.receiver.stop()
self.sockets_manager.cleanup()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
index a8cfe93..e7362e2 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py
@@ -15,13 +15,10 @@
from oslo_messaging._drivers import common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
- import zmq_dealer_publisher
+ import zmq_dealer_publisher_direct
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
-from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
-from oslo_messaging._drivers.zmq_driver.client import zmq_senders
-from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -45,34 +42,18 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
if conf.use_router_proxy or not conf.use_pub_sub:
raise WrongClientException()
- self.sockets_manager = zmq_sockets_manager.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER
- )
-
- sender_proxy = zmq_senders.RequestSenderProxy(conf)
- sender_direct = zmq_senders.RequestSenderDirect(conf)
+ publisher_direct = \
+ zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
- receiver_direct = zmq_receivers.ReplyReceiverDirect(conf)
-
- fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
- self.sockets_manager, sender_proxy
- )
+ publisher_proxy = \
+ zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
super(ZmqClientMixDirectPubSub, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
- zmq_names.CALL_TYPE:
- zmq_dealer_publisher.DealerCallPublisher(
- self.sockets_manager, sender_direct, receiver_direct
- ),
-
- zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
-
- zmq_names.NOTIFY_TYPE: fanout_publisher,
-
- "default":
- zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
- sender_direct)
+ zmq_names.CAST_FANOUT_TYPE: publisher_proxy,
+ zmq_names.NOTIFY_TYPE: publisher_proxy,
+ "default": publisher_direct
}
)
@@ -90,26 +71,12 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
if conf.use_pub_sub or conf.use_router_proxy:
raise WrongClientException()
- self.sockets_manager = zmq_sockets_manager.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER
- )
-
- sender = zmq_senders.RequestSenderDirect(conf)
-
- receiver = zmq_receivers.ReplyReceiverDirect(conf)
+ publisher = \
+ zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
super(ZmqClientDirect, self).__init__(
conf, matchmaker, allowed_remote_exmods,
- publishers={
- zmq_names.CALL_TYPE:
- zmq_dealer_publisher.DealerCallPublisher(
- self.sockets_manager, sender, receiver
- ),
-
- "default":
- zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
- sender)
- }
+ publishers={"default": publisher}
)
@@ -128,25 +95,10 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
if not conf.use_router_proxy:
raise WrongClientException()
- self.sockets_manager = zmq_sockets_manager.SocketsManager(
- conf, matchmaker, zmq.ROUTER, zmq.DEALER
- )
-
- sender = zmq_senders.RequestSenderProxy(conf)
-
- receiver = zmq_receivers.ReplyReceiverProxy(conf)
+ publisher = \
+ zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
super(ZmqClientProxy, self).__init__(
conf, matchmaker, allowed_remote_exmods,
- publishers={
- zmq_names.CALL_TYPE:
- zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
- self.sockets_manager, sender, receiver
- ),
-
- "default":
- zmq_dealer_publisher_proxy.DealerPublisherProxy(
- self.sockets_manager, sender
- )
- }
+ publishers={"default": publisher}
)
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
index 7630cc7..4643ff3 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py
@@ -24,45 +24,44 @@ class ZmqClientBase(object):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None,
publishers=None):
self.conf = conf
- self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
self.publishers = publishers
- self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \
- or publishers["default"]
- self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \
- or publishers["default"]
- self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \
- or publishers["default"]
- self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \
- or publishers["default"]
+ self.call_publisher = publishers.get(zmq_names.CALL_TYPE,
+ publishers["default"])
+ self.cast_publisher = publishers.get(zmq_names.CAST_TYPE,
+ publishers["default"])
+ self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE,
+ publishers["default"])
+ self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE,
+ publishers["default"])
def send_call(self, target, context, message, timeout=None, retry=None):
request = zmq_request.CallRequest(
target, context=context, message=message, retry=retry,
timeout=timeout, allowed_remote_exmods=self.allowed_remote_exmods
)
- return self.call_publisher.send_request(request)
+ return self.call_publisher.send_call(request)
def send_cast(self, target, context, message, retry=None):
request = zmq_request.CastRequest(
target, context=context, message=message, retry=retry
)
- self.cast_publisher.send_request(request)
+ self.cast_publisher.send_cast(request)
def send_fanout(self, target, context, message, retry=None):
request = zmq_request.FanoutRequest(
target, context=context, message=message, retry=retry
)
- self.fanout_publisher.send_request(request)
+ self.fanout_publisher.send_fanout(request)
def send_notify(self, target, context, message, version, retry=None):
request = zmq_request.NotificationRequest(
target, context=context, message=message, retry=retry,
version=version
)
- self.notify_publisher.send_request(request)
+ self.notify_publisher.send_notify(request)
def cleanup(self):
cleaned = set()
diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
index 2fb8191..3b83d9a 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py
@@ -37,7 +37,15 @@ class SenderBase(object):
pass
-class RequestSenderProxy(SenderBase):
+class RequestSender(SenderBase):
+ pass
+
+
+class ReplySender(SenderBase):
+ pass
+
+
+class RequestSenderProxy(RequestSender):
def send(self, socket, request):
socket.send(b'', zmq.SNDMORE)
@@ -55,7 +63,7 @@ class RequestSenderProxy(SenderBase):
"target": request.target})
-class ReplySenderProxy(SenderBase):
+class ReplySenderProxy(ReplySender):
def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id)
@@ -69,7 +77,7 @@ class ReplySenderProxy(SenderBase):
socket.send_dumped(reply.to_dict())
-class RequestSenderDirect(SenderBase):
+class RequestSenderDirect(RequestSender):
def send(self, socket, request):
socket.send(b'', zmq.SNDMORE)
@@ -85,7 +93,7 @@ class RequestSenderDirect(SenderBase):
"target": request.target})
-class ReplySenderDirect(SenderBase):
+class ReplySenderDirect(ReplySender):
def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id)