summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py')
-rw-r--r--oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py16
1 files changed, 6 insertions, 10 deletions
diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
index 4a5eba4..3c232e3 100644
--- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
+++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py
@@ -16,12 +16,11 @@ import abc
from concurrent import futures
import logging
-import retrying
-
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base
+from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@@ -56,15 +55,13 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
{"tout": request.timeout, "msg_id": request.message_id}
)
- @abc.abstractmethod
- def _connect_socket(self, request):
- pass
-
def _recv_reply(self, request):
- reply_future, = self.receiver.track_request(request)
+ reply_future = \
+ self.receiver.track_request(request)[zmq_names.REPLY_TYPE]
try:
_, reply = reply_future.result(timeout=request.timeout)
+ assert isinstance(reply, zmq_response.Reply), "Reply expected!"
except AssertionError:
LOG.error(_LE("Message format error in reply for %s"),
request.message_id)
@@ -84,9 +81,8 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
def send_call(self, request):
self._check_pattern(request, zmq_names.CALL_TYPE)
- try:
- socket = self._connect_socket(request)
- except retrying.RetryError:
+ socket = self.connect_socket(request)
+ if not socket:
self._raise_timeout(request)
self.sender.send(socket, request)