diff options
Diffstat (limited to 'oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py')
-rw-r--r-- | oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py | 16 |
1 files changed, 6 insertions, 10 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py index 4a5eba4..3c232e3 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py @@ -16,12 +16,11 @@ import abc from concurrent import futures import logging -import retrying - import oslo_messaging from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -56,15 +55,13 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase): {"tout": request.timeout, "msg_id": request.message_id} ) - @abc.abstractmethod - def _connect_socket(self, request): - pass - def _recv_reply(self, request): - reply_future, = self.receiver.track_request(request) + reply_future = \ + self.receiver.track_request(request)[zmq_names.REPLY_TYPE] try: _, reply = reply_future.result(timeout=request.timeout) + assert isinstance(reply, zmq_response.Reply), "Reply expected!" except AssertionError: LOG.error(_LE("Message format error in reply for %s"), request.message_id) @@ -84,9 +81,8 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase): def send_call(self, request): self._check_pattern(request, zmq_names.CALL_TYPE) - try: - socket = self._connect_socket(request) - except retrying.RetryError: + socket = self.connect_socket(request) + if not socket: self._raise_timeout(request) self.sender.send(socket, request) |